Patches to mailman-2.1.18 to use FreeBSD "kqueue"
This may be of greatest interest to FreeBSD & MacOS "port" maintainers.. I've been running these patches to mailman mailman-2.1.18.1 to use "kqueue" to watch the queue directories since August. As you can see the idle components don't accumulate any CPU time (they don't have to poll their queue directory): mail% ps axwww | grep mailman 1067 - IWs 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/mailmanctl -s -q start 1068 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=ArchRunner:0:1 -s 1069 - I 0:02.76 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=BounceRunner:0:1 -s 1070 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=CommandRunner:0:1 -s 1071 - I 0:02.85 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=IncomingRunner:0:1 -s 1072 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=NewsRunner:0:1 -s 1073 - I 0:04.67 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=OutgoingRunner:0:1 -s 1074 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=VirginRunner:0:1 -s 1075 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=RetryRunner:0:1 -s [this particular set of processes have been running since Oct 5th] It looks like the "inotify" facility on Linux might do the job, but it's not among Python's "included batteries" (yet) I did a little bit of jiggering to periodic wakeups, I can't remember at this point if it was all "strictly necessary".. I'd be happy to have the work picked up (but would appreciate credit for the initial work). --- Runner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ Runner.py 2014-08-15 11:24:02.000000000 -0400 @@ -15,9 +15,14 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. -"""Generic queue runner class. +"""Generic queue runner class. (hacked for FreeBSD by phil@ultimate.com) """ +USE_KQUEUE = True + +import os # for kqueue +import select # for kqueue +import errno # for kqueue import time import traceback from cStringIO import StringIO @@ -44,6 +49,7 @@ class Runner: QDIR = None SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME + PERIODIC = 0 def __init__(self, slice=None, numslices=1): self._kids = {} @@ -53,6 +59,21 @@ # Create the shunt switchboard self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) self._stop = False + # BEGIN phil@ultimate.com + self.kq = None + # See if BSD/OSX 'kqueue' can be used for dir watching: + if USE_KQUEUE and getattr(select, 'kqueue', None): + self._kqueue_init() + # Linux has inotify, but not among included batteries + # if other mechanisms implemented, should be hidden in a + # WatchDir() class, maybe should be in any case?! + + # pulled up from BounceMixin + if self.PERIODIC > 0: + self._nextaction = time.time() + self.PERIODIC + else: + self._nextaction = -1 + # END phil@ultimate.com def __repr__(self): return '<%s at %s>' % (self.__class__.__name__, id(self)) @@ -70,8 +91,8 @@ filecnt = self._oneloop() # Do the periodic work for the subclass. BAW: this # shouldn't be called here. There should be one more - # _doperiodic() call at the end of the _oneloop() loop. - self._doperiodic() + # _checkperiodic() call at the end of the _oneloop() loop. + self._checkperiodic() # If the stop flag is set, we're done. if self._stop: break @@ -146,7 +167,7 @@ self._switchboard.finish(filebase, preserve=True) # Other work we want to do each time through the loop Utils.reap(self._kids, once=True) - self._doperiodic() + self._checkperiodic() if self._shortcircuit(): break return len(files) @@ -243,15 +264,31 @@ """ raise NotImplementedError - def _doperiodic(self): - """Do some processing `every once in a while'. + # BEGIN phil@ultimate.com + def _checkperiodic(self): + """Called every once in a while both from the Runner's main loop, and + from the Runner's hash slice processing loop. To check if time + to run periodic processing. - Called every once in a while both from the Runner's main loop, and - from the Runner's hash slice processing loop. You can do whatever + If self.PERIODIC is positive, calls subclass _doperiodic() method + every self.PERIODIC seconds. + """ + if self.PERIODIC <= 0: + return + now = time.time() + if self._nextaction > now: + return + self._nextaction = now + self.PERIODIC + # Run the subclass _myperiodic method: + self._myperiodic() + + def _myperiodic(self): + """Do some processing `every once in a while' + (every self.PERIODIC seconds) You can do whatever special periodic processing you want here, and the return value is irrelevant. """ - pass + syslog('error', 'empty _myperiodic called') def _snooze(self, filecnt): """Sleep for a little while. @@ -263,8 +300,68 @@ """ if filecnt or self.SLEEPTIME <= 0: return + if USE_KQUEUE and self.kq: + self._kqueue_snooze() + return time.sleep(self.SLEEPTIME) + # BEGIN phil@ultimate.com + def _kqueue_init(self): + """Setup a kqueue fd listening for VNODE WRITE events in self.QDIR""" + self.kq = None + self.kq_dirfd = -1 + try: + self.kq = select.kqueue() + self.kq_dirfd = os.open(self.QDIR, os.O_RDONLY) + ev = select.kevent(self.kq_dirfd, + filter=select.KQ_FILTER_VNODE, + flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR, + fflags=select.KQ_NOTE_WRITE) + # install ev, return no events, don't sleep + self.kq.control([ev], 0, 0) + return True + except Exception, e: + if self.kq: + self.kq.close() + self.kq = None + if self.kq_dirfd != -1: + os.close(self.kq_dirfd) + self.kq_dirfd = -1 + return False + + def _kqueue_snooze(self): + """Sleep until activity in self.QDIR + or time to call _myperiodic() + """ + wait = None # default to indefinite + if self.PERIODIC > 0 and self._nextaction > 0: + wait = self._nextaction - time.time() + # periodic processing overdue, just return + if wait < 0: + return + if USE_KQUEUE and self.kq: + try: + syslog('phil', 'calling kq.control wait=%s', wait) + # no changes, return at most 10 events (should only return 1) + events = self.kq.control([], 10, wait) + syslog('phil', 'kq.control returned %d', len(events)) + # ignore events!! + time.sleep(5.0) # TEMP!!!! + return + except KeyboardInterrupt: + raise + except OSError, exc: + # suppress EINTR, like time.sleep + if exc.errno == errno.EINTR: + return + syslog('phil', 'kq.control exception: %s', exc) + #raise + # fall thru..... + if wait is None or wait > self.SLEEPTIME: + wait = self.SLEEPTIME + time.sleep(wait) + # END phil@ultimate.com + def _shortcircuit(self): """Return a true value if the individual file processing loop should exit before it's finished processing each message in the current slice --- BounceRunner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ BounceRunner.py 2014-08-14 09:55:00.000000000 -0400 @@ -46,8 +46,18 @@ -class BounceMixin: - def __init__(self): +# phil@ultimate.com: the only users of BounceMixin +# used "Runner" as a base, so refactoring as BounceRunnerBase +# to avoid MRO end-runs (and added pain for PERIODIC), +# seems to require fewer explicit parent class calls +# (__init__, _cleanup) too! + +class BounceRunnerBase(Runner): + PERIODIC = mm_cfg.REGISTER_BOUNCES_EVERY + + def __init__(self, slice=None, numslices=1): + Runner.__init__(self, slice, numslices) + # Registering a bounce means acquiring the list lock, and it would be # too expensive to do this for each message. Instead, each bounce # runner maintains an event log which is essentially a file with @@ -84,7 +94,6 @@ mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid()) self._bounce_events_fp = None self._bouncecnt = 0 - self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY def _queue_bounces(self, listname, addrs, msg): today = time.localtime()[:3] @@ -135,14 +144,12 @@ def _cleanup(self): if self._bouncecnt > 0: self._register_bounces() + Runner._cleanup(self) - def _doperiodic(self): - now = time.time() - if self._nextaction > now or self._bouncecnt == 0: - return - # Let's go ahead and register the bounces we've got stored up - self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY - self._register_bounces() + def _myperiodic(self): + """called every self.PERIODIC seconds""" + if self._bouncecnt > 0: + self._register_bounces() def _probe_bounce(self, mlist, token): locked = mlist.Locked() @@ -161,13 +168,9 @@ -class BounceRunner(Runner, BounceMixin): +class BounceRunner(BounceRunnerBase): QDIR = mm_cfg.BOUNCEQUEUE_DIR - def __init__(self, slice=None, numslices=1): - Runner.__init__(self, slice, numslices) - BounceMixin.__init__(self) - def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state mlist.Load() @@ -258,14 +261,6 @@ # addrs = filter(None, addrs) # MAS above filter moved up so we don't try to queue an empty list. self._queue_bounces(mlist.internal_name(), addrs, msg) - - _doperiodic = BounceMixin._doperiodic - - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self) - - def verp_bounce(mlist, msg): bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail()) --- OutgoingRunner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ OutgoingRunner.py 2014-07-26 11:42:18.000000000 -0400 @@ -31,12 +31,12 @@ from Mailman import LockFile from Mailman.Queue.Runner import Runner from Mailman.Queue.Switchboard import Switchboard -from Mailman.Queue.BounceRunner import BounceMixin +from Mailman.Queue.BounceRunner import BounceRunnerBase from Mailman.Logging.Syslog import syslog # This controls how often _doperiodic() will try to deal with deferred # permanent failures. It is a count of calls to _doperiodic() -DEAL_WITH_PERMFAILURES_EVERY = 10 +DEAL_WITH_PERMFAILURES_EVERY = 10 # PLB not used? try: True, False @@ -46,12 +46,11 @@ -class OutgoingRunner(Runner, BounceMixin): +class OutgoingRunner(BounceRunnerBase): QDIR = mm_cfg.OUTQUEUE_DIR def __init__(self, slice=None, numslices=1): - Runner.__init__(self, slice, numslices) - BounceMixin.__init__(self) + BounceRunnerBase.__init__(self, slice, numslices) # We look this function up only at startup time modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE mod = __import__(modname) @@ -131,9 +130,3 @@ self.__retryq.enqueue(msg, msgdata) # We've successfully completed handling of this message return False - - _doperiodic = BounceMixin._doperiodic - - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self)
On Oct 31, 2014, at 02:53 PM, Phil Budne wrote:
This may be of greatest interest to FreeBSD & MacOS "port" maintainers.. I've been running these patches to mailman mailman-2.1.18.1 to use "kqueue" to watch the queue directories since August.
It looks like the "inotify" facility on Linux might do the job, but it's not among Python's "included batteries" (yet)
I'd be supportive of a patch to MM3 to support kqueue/inotify. There appear to be several inotify modules available on PyPI, and even at least two in Debian (with pyinotify looking to have both Python 2 and 3 support). Other than that, I don't know which one is "better".
Would you be interested in putting together a merge proposal for MM3?
Cheers, -Barry
participants (2)
-
Barry Warsaw
-
Phil Budne