[Mailman-Developers] Patches to mailman-2.1.18 to use FreeBSD "kqueue"
Phil Budne
phil at ultimate.com
Fri Oct 31 19:53:25 CET 2014
This may be of greatest interest to FreeBSD & MacOS "port"
maintainers.. I've been running these patches to mailman
mailman-2.1.18.1 to use "kqueue" to watch the queue directories since
August.
As you can see the idle components don't accumulate any CPU time
(they don't have to poll their queue directory):
mail% ps axwww | grep mailman
1067 - IWs 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/mailmanctl -s -q start
1068 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=ArchRunner:0:1 -s
1069 - I 0:02.76 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=BounceRunner:0:1 -s
1070 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=CommandRunner:0:1 -s
1071 - I 0:02.85 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=IncomingRunner:0:1 -s
1072 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=NewsRunner:0:1 -s
1073 - I 0:04.67 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=OutgoingRunner:0:1 -s
1074 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=VirginRunner:0:1 -s
1075 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=RetryRunner:0:1 -s
[this particular set of processes have been running since Oct 5th]
It looks like the "inotify" facility on Linux might do the job, but
it's not among Python's "included batteries" (yet)
I did a little bit of jiggering to periodic wakeups, I can't remember
at this point if it was all "strictly necessary".. I'd be happy to
have the work picked up (but would appreciate credit for the initial work).
--- Runner.py.orig 2014-07-11 15:01:26.000000000 -0400
+++ Runner.py 2014-08-15 11:24:02.000000000 -0400
@@ -15,9 +15,14 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
-"""Generic queue runner class.
+"""Generic queue runner class. (hacked for FreeBSD by phil at ultimate.com)
"""
+USE_KQUEUE = True
+
+import os # for kqueue
+import select # for kqueue
+import errno # for kqueue
import time
import traceback
from cStringIO import StringIO
@@ -44,6 +49,7 @@
class Runner:
QDIR = None
SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
+ PERIODIC = 0
def __init__(self, slice=None, numslices=1):
self._kids = {}
@@ -53,6 +59,21 @@
# Create the shunt switchboard
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
self._stop = False
+ # BEGIN phil at ultimate.com
+ self.kq = None
+ # See if BSD/OSX 'kqueue' can be used for dir watching:
+ if USE_KQUEUE and getattr(select, 'kqueue', None):
+ self._kqueue_init()
+ # Linux has inotify, but not among included batteries
+ # if other mechanisms implemented, should be hidden in a
+ # WatchDir() class, maybe should be in any case?!
+
+ # pulled up from BounceMixin
+ if self.PERIODIC > 0:
+ self._nextaction = time.time() + self.PERIODIC
+ else:
+ self._nextaction = -1
+ # END phil at ultimate.com
def __repr__(self):
return '<%s at %s>' % (self.__class__.__name__, id(self))
@@ -70,8 +91,8 @@
filecnt = self._oneloop()
# Do the periodic work for the subclass. BAW: this
# shouldn't be called here. There should be one more
- # _doperiodic() call at the end of the _oneloop() loop.
- self._doperiodic()
+ # _checkperiodic() call at the end of the _oneloop() loop.
+ self._checkperiodic()
# If the stop flag is set, we're done.
if self._stop:
break
@@ -146,7 +167,7 @@
self._switchboard.finish(filebase, preserve=True)
# Other work we want to do each time through the loop
Utils.reap(self._kids, once=True)
- self._doperiodic()
+ self._checkperiodic()
if self._shortcircuit():
break
return len(files)
@@ -243,15 +264,31 @@
"""
raise NotImplementedError
- def _doperiodic(self):
- """Do some processing `every once in a while'.
+ # BEGIN phil at ultimate.com
+ def _checkperiodic(self):
+ """Called every once in a while both from the Runner's main loop, and
+ from the Runner's hash slice processing loop. To check if time
+ to run periodic processing.
- Called every once in a while both from the Runner's main loop, and
- from the Runner's hash slice processing loop. You can do whatever
+ If self.PERIODIC is positive, calls subclass _doperiodic() method
+ every self.PERIODIC seconds.
+ """
+ if self.PERIODIC <= 0:
+ return
+ now = time.time()
+ if self._nextaction > now:
+ return
+ self._nextaction = now + self.PERIODIC
+ # Run the subclass _myperiodic method:
+ self._myperiodic()
+
+ def _myperiodic(self):
+ """Do some processing `every once in a while'
+ (every self.PERIODIC seconds) You can do whatever
special periodic processing you want here, and the return value is
irrelevant.
"""
- pass
+ syslog('error', 'empty _myperiodic called')
def _snooze(self, filecnt):
"""Sleep for a little while.
@@ -263,8 +300,68 @@
"""
if filecnt or self.SLEEPTIME <= 0:
return
+ if USE_KQUEUE and self.kq:
+ self._kqueue_snooze()
+ return
time.sleep(self.SLEEPTIME)
+ # BEGIN phil at ultimate.com
+ def _kqueue_init(self):
+ """Setup a kqueue fd listening for VNODE WRITE events in self.QDIR"""
+ self.kq = None
+ self.kq_dirfd = -1
+ try:
+ self.kq = select.kqueue()
+ self.kq_dirfd = os.open(self.QDIR, os.O_RDONLY)
+ ev = select.kevent(self.kq_dirfd,
+ filter=select.KQ_FILTER_VNODE,
+ flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR,
+ fflags=select.KQ_NOTE_WRITE)
+ # install ev, return no events, don't sleep
+ self.kq.control([ev], 0, 0)
+ return True
+ except Exception, e:
+ if self.kq:
+ self.kq.close()
+ self.kq = None
+ if self.kq_dirfd != -1:
+ os.close(self.kq_dirfd)
+ self.kq_dirfd = -1
+ return False
+
+ def _kqueue_snooze(self):
+ """Sleep until activity in self.QDIR
+ or time to call _myperiodic()
+ """
+ wait = None # default to indefinite
+ if self.PERIODIC > 0 and self._nextaction > 0:
+ wait = self._nextaction - time.time()
+ # periodic processing overdue, just return
+ if wait < 0:
+ return
+ if USE_KQUEUE and self.kq:
+ try:
+ syslog('phil', 'calling kq.control wait=%s', wait)
+ # no changes, return at most 10 events (should only return 1)
+ events = self.kq.control([], 10, wait)
+ syslog('phil', 'kq.control returned %d', len(events))
+ # ignore events!!
+ time.sleep(5.0) # TEMP!!!!
+ return
+ except KeyboardInterrupt:
+ raise
+ except OSError, exc:
+ # suppress EINTR, like time.sleep
+ if exc.errno == errno.EINTR:
+ return
+ syslog('phil', 'kq.control exception: %s', exc)
+ #raise
+ # fall thru.....
+ if wait is None or wait > self.SLEEPTIME:
+ wait = self.SLEEPTIME
+ time.sleep(wait)
+ # END phil at ultimate.com
+
def _shortcircuit(self):
"""Return a true value if the individual file processing loop should
exit before it's finished processing each message in the current slice
--- BounceRunner.py.orig 2014-07-11 15:01:26.000000000 -0400
+++ BounceRunner.py 2014-08-14 09:55:00.000000000 -0400
@@ -46,8 +46,18 @@
-class BounceMixin:
- def __init__(self):
+# phil at ultimate.com: the only users of BounceMixin
+# used "Runner" as a base, so refactoring as BounceRunnerBase
+# to avoid MRO end-runs (and added pain for PERIODIC),
+# seems to require fewer explicit parent class calls
+# (__init__, _cleanup) too!
+
+class BounceRunnerBase(Runner):
+ PERIODIC = mm_cfg.REGISTER_BOUNCES_EVERY
+
+ def __init__(self, slice=None, numslices=1):
+ Runner.__init__(self, slice, numslices)
+
# Registering a bounce means acquiring the list lock, and it would be
# too expensive to do this for each message. Instead, each bounce
# runner maintains an event log which is essentially a file with
@@ -84,7 +94,6 @@
mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid())
self._bounce_events_fp = None
self._bouncecnt = 0
- self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY
def _queue_bounces(self, listname, addrs, msg):
today = time.localtime()[:3]
@@ -135,14 +144,12 @@
def _cleanup(self):
if self._bouncecnt > 0:
self._register_bounces()
+ Runner._cleanup(self)
- def _doperiodic(self):
- now = time.time()
- if self._nextaction > now or self._bouncecnt == 0:
- return
- # Let's go ahead and register the bounces we've got stored up
- self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY
- self._register_bounces()
+ def _myperiodic(self):
+ """called every self.PERIODIC seconds"""
+ if self._bouncecnt > 0:
+ self._register_bounces()
def _probe_bounce(self, mlist, token):
locked = mlist.Locked()
@@ -161,13 +168,9 @@
-class BounceRunner(Runner, BounceMixin):
+class BounceRunner(BounceRunnerBase):
QDIR = mm_cfg.BOUNCEQUEUE_DIR
- def __init__(self, slice=None, numslices=1):
- Runner.__init__(self, slice, numslices)
- BounceMixin.__init__(self)
-
def _dispose(self, mlist, msg, msgdata):
# Make sure we have the most up-to-date state
mlist.Load()
@@ -258,14 +261,6 @@
# addrs = filter(None, addrs)
# MAS above filter moved up so we don't try to queue an empty list.
self._queue_bounces(mlist.internal_name(), addrs, msg)
-
- _doperiodic = BounceMixin._doperiodic
-
- def _cleanup(self):
- BounceMixin._cleanup(self)
- Runner._cleanup(self)
-
-
def verp_bounce(mlist, msg):
bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
--- OutgoingRunner.py.orig 2014-07-11 15:01:26.000000000 -0400
+++ OutgoingRunner.py 2014-07-26 11:42:18.000000000 -0400
@@ -31,12 +31,12 @@
from Mailman import LockFile
from Mailman.Queue.Runner import Runner
from Mailman.Queue.Switchboard import Switchboard
-from Mailman.Queue.BounceRunner import BounceMixin
+from Mailman.Queue.BounceRunner import BounceRunnerBase
from Mailman.Logging.Syslog import syslog
# This controls how often _doperiodic() will try to deal with deferred
# permanent failures. It is a count of calls to _doperiodic()
-DEAL_WITH_PERMFAILURES_EVERY = 10
+DEAL_WITH_PERMFAILURES_EVERY = 10 # PLB not used?
try:
True, False
@@ -46,12 +46,11 @@
-class OutgoingRunner(Runner, BounceMixin):
+class OutgoingRunner(BounceRunnerBase):
QDIR = mm_cfg.OUTQUEUE_DIR
def __init__(self, slice=None, numslices=1):
- Runner.__init__(self, slice, numslices)
- BounceMixin.__init__(self)
+ BounceRunnerBase.__init__(self, slice, numslices)
# We look this function up only at startup time
modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
mod = __import__(modname)
@@ -131,9 +130,3 @@
self.__retryq.enqueue(msg, msgdata)
# We've successfully completed handling of this message
return False
-
- _doperiodic = BounceMixin._doperiodic
-
- def _cleanup(self):
- BounceMixin._cleanup(self)
- Runner._cleanup(self)
More information about the Mailman-Developers
mailing list