Patches to mailman-2.1.18 to use FreeBSD "kqueue"

This may be of greatest interest to FreeBSD & MacOS "port" maintainers.. I've been running these patches to mailman mailman-2.1.18.1 to use "kqueue" to watch the queue directories since August. As you can see the idle components don't accumulate any CPU time (they don't have to poll their queue directory): mail% ps axwww | grep mailman 1067 - IWs 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/mailmanctl -s -q start 1068 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=ArchRunner:0:1 -s 1069 - I 0:02.76 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=BounceRunner:0:1 -s 1070 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=CommandRunner:0:1 -s 1071 - I 0:02.85 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=IncomingRunner:0:1 -s 1072 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=NewsRunner:0:1 -s 1073 - I 0:04.67 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=OutgoingRunner:0:1 -s 1074 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=VirginRunner:0:1 -s 1075 - IW 0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=RetryRunner:0:1 -s [this particular set of processes have been running since Oct 5th] It looks like the "inotify" facility on Linux might do the job, but it's not among Python's "included batteries" (yet) I did a little bit of jiggering to periodic wakeups, I can't remember at this point if it was all "strictly necessary".. I'd be happy to have the work picked up (but would appreciate credit for the initial work). --- Runner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ Runner.py 2014-08-15 11:24:02.000000000 -0400 @@ -15,9 +15,14 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. -"""Generic queue runner class. +"""Generic queue runner class. (hacked for FreeBSD by phil@ultimate.com) """ +USE_KQUEUE = True + +import os # for kqueue +import select # for kqueue +import errno # for kqueue import time import traceback from cStringIO import StringIO @@ -44,6 +49,7 @@ class Runner: QDIR = None SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME + PERIODIC = 0 def __init__(self, slice=None, numslices=1): self._kids = {} @@ -53,6 +59,21 @@ # Create the shunt switchboard self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) self._stop = False + # BEGIN phil@ultimate.com + self.kq = None + # See if BSD/OSX 'kqueue' can be used for dir watching: + if USE_KQUEUE and getattr(select, 'kqueue', None): + self._kqueue_init() + # Linux has inotify, but not among included batteries + # if other mechanisms implemented, should be hidden in a + # WatchDir() class, maybe should be in any case?! + + # pulled up from BounceMixin + if self.PERIODIC > 0: + self._nextaction = time.time() + self.PERIODIC + else: + self._nextaction = -1 + # END phil@ultimate.com def __repr__(self): return '<%s at %s>' % (self.__class__.__name__, id(self)) @@ -70,8 +91,8 @@ filecnt = self._oneloop() # Do the periodic work for the subclass. BAW: this # shouldn't be called here. There should be one more - # _doperiodic() call at the end of the _oneloop() loop. - self._doperiodic() + # _checkperiodic() call at the end of the _oneloop() loop. + self._checkperiodic() # If the stop flag is set, we're done. if self._stop: break @@ -146,7 +167,7 @@ self._switchboard.finish(filebase, preserve=True) # Other work we want to do each time through the loop Utils.reap(self._kids, once=True) - self._doperiodic() + self._checkperiodic() if self._shortcircuit(): break return len(files) @@ -243,15 +264,31 @@ """ raise NotImplementedError - def _doperiodic(self): - """Do some processing `every once in a while'. + # BEGIN phil@ultimate.com + def _checkperiodic(self): + """Called every once in a while both from the Runner's main loop, and + from the Runner's hash slice processing loop. To check if time + to run periodic processing. - Called every once in a while both from the Runner's main loop, and - from the Runner's hash slice processing loop. You can do whatever + If self.PERIODIC is positive, calls subclass _doperiodic() method + every self.PERIODIC seconds. + """ + if self.PERIODIC <= 0: + return + now = time.time() + if self._nextaction > now: + return + self._nextaction = now + self.PERIODIC + # Run the subclass _myperiodic method: + self._myperiodic() + + def _myperiodic(self): + """Do some processing `every once in a while' + (every self.PERIODIC seconds) You can do whatever special periodic processing you want here, and the return value is irrelevant. """ - pass + syslog('error', 'empty _myperiodic called') def _snooze(self, filecnt): """Sleep for a little while. @@ -263,8 +300,68 @@ """ if filecnt or self.SLEEPTIME <= 0: return + if USE_KQUEUE and self.kq: + self._kqueue_snooze() + return time.sleep(self.SLEEPTIME) + # BEGIN phil@ultimate.com + def _kqueue_init(self): + """Setup a kqueue fd listening for VNODE WRITE events in self.QDIR""" + self.kq = None + self.kq_dirfd = -1 + try: + self.kq = select.kqueue() + self.kq_dirfd = os.open(self.QDIR, os.O_RDONLY) + ev = select.kevent(self.kq_dirfd, + filter=select.KQ_FILTER_VNODE, + flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR, + fflags=select.KQ_NOTE_WRITE) + # install ev, return no events, don't sleep + self.kq.control([ev], 0, 0) + return True + except Exception, e: + if self.kq: + self.kq.close() + self.kq = None + if self.kq_dirfd != -1: + os.close(self.kq_dirfd) + self.kq_dirfd = -1 + return False + + def _kqueue_snooze(self): + """Sleep until activity in self.QDIR + or time to call _myperiodic() + """ + wait = None # default to indefinite + if self.PERIODIC > 0 and self._nextaction > 0: + wait = self._nextaction - time.time() + # periodic processing overdue, just return + if wait < 0: + return + if USE_KQUEUE and self.kq: + try: + syslog('phil', 'calling kq.control wait=%s', wait) + # no changes, return at most 10 events (should only return 1) + events = self.kq.control([], 10, wait) + syslog('phil', 'kq.control returned %d', len(events)) + # ignore events!! + time.sleep(5.0) # TEMP!!!! + return + except KeyboardInterrupt: + raise + except OSError, exc: + # suppress EINTR, like time.sleep + if exc.errno == errno.EINTR: + return + syslog('phil', 'kq.control exception: %s', exc) + #raise + # fall thru..... + if wait is None or wait > self.SLEEPTIME: + wait = self.SLEEPTIME + time.sleep(wait) + # END phil@ultimate.com + def _shortcircuit(self): """Return a true value if the individual file processing loop should exit before it's finished processing each message in the current slice --- BounceRunner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ BounceRunner.py 2014-08-14 09:55:00.000000000 -0400 @@ -46,8 +46,18 @@ -class BounceMixin: - def __init__(self): +# phil@ultimate.com: the only users of BounceMixin +# used "Runner" as a base, so refactoring as BounceRunnerBase +# to avoid MRO end-runs (and added pain for PERIODIC), +# seems to require fewer explicit parent class calls +# (__init__, _cleanup) too! + +class BounceRunnerBase(Runner): + PERIODIC = mm_cfg.REGISTER_BOUNCES_EVERY + + def __init__(self, slice=None, numslices=1): + Runner.__init__(self, slice, numslices) + # Registering a bounce means acquiring the list lock, and it would be # too expensive to do this for each message. Instead, each bounce # runner maintains an event log which is essentially a file with @@ -84,7 +94,6 @@ mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid()) self._bounce_events_fp = None self._bouncecnt = 0 - self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY def _queue_bounces(self, listname, addrs, msg): today = time.localtime()[:3] @@ -135,14 +144,12 @@ def _cleanup(self): if self._bouncecnt > 0: self._register_bounces() + Runner._cleanup(self) - def _doperiodic(self): - now = time.time() - if self._nextaction > now or self._bouncecnt == 0: - return - # Let's go ahead and register the bounces we've got stored up - self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY - self._register_bounces() + def _myperiodic(self): + """called every self.PERIODIC seconds""" + if self._bouncecnt > 0: + self._register_bounces() def _probe_bounce(self, mlist, token): locked = mlist.Locked() @@ -161,13 +168,9 @@ -class BounceRunner(Runner, BounceMixin): +class BounceRunner(BounceRunnerBase): QDIR = mm_cfg.BOUNCEQUEUE_DIR - def __init__(self, slice=None, numslices=1): - Runner.__init__(self, slice, numslices) - BounceMixin.__init__(self) - def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state mlist.Load() @@ -258,14 +261,6 @@ # addrs = filter(None, addrs) # MAS above filter moved up so we don't try to queue an empty list. self._queue_bounces(mlist.internal_name(), addrs, msg) - - _doperiodic = BounceMixin._doperiodic - - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self) - - def verp_bounce(mlist, msg): bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail()) --- OutgoingRunner.py.orig 2014-07-11 15:01:26.000000000 -0400 +++ OutgoingRunner.py 2014-07-26 11:42:18.000000000 -0400 @@ -31,12 +31,12 @@ from Mailman import LockFile from Mailman.Queue.Runner import Runner from Mailman.Queue.Switchboard import Switchboard -from Mailman.Queue.BounceRunner import BounceMixin +from Mailman.Queue.BounceRunner import BounceRunnerBase from Mailman.Logging.Syslog import syslog # This controls how often _doperiodic() will try to deal with deferred # permanent failures. It is a count of calls to _doperiodic() -DEAL_WITH_PERMFAILURES_EVERY = 10 +DEAL_WITH_PERMFAILURES_EVERY = 10 # PLB not used? try: True, False @@ -46,12 +46,11 @@ -class OutgoingRunner(Runner, BounceMixin): +class OutgoingRunner(BounceRunnerBase): QDIR = mm_cfg.OUTQUEUE_DIR def __init__(self, slice=None, numslices=1): - Runner.__init__(self, slice, numslices) - BounceMixin.__init__(self) + BounceRunnerBase.__init__(self, slice, numslices) # We look this function up only at startup time modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE mod = __import__(modname) @@ -131,9 +130,3 @@ self.__retryq.enqueue(msg, msgdata) # We've successfully completed handling of this message return False - - _doperiodic = BounceMixin._doperiodic - - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self)

On Oct 31, 2014, at 02:53 PM, Phil Budne wrote:
I'd be supportive of a patch to MM3 to support kqueue/inotify. There appear to be several inotify modules available on PyPI, and even at least two in Debian (with pyinotify looking to have both Python 2 and 3 support). Other than that, I don't know which one is "better".
Would you be interested in putting together a merge proposal for MM3?
Cheers, -Barry

On Oct 31, 2014, at 02:53 PM, Phil Budne wrote:
I'd be supportive of a patch to MM3 to support kqueue/inotify. There appear to be several inotify modules available on PyPI, and even at least two in Debian (with pyinotify looking to have both Python 2 and 3 support). Other than that, I don't know which one is "better".
Would you be interested in putting together a merge proposal for MM3?
Cheers, -Barry
participants (2)
-
Barry Warsaw
-
Phil Budne