using kernel events to watch queue directories
Some time ago I had done a (crude) patch to mailman v2 to use FreeBSD "kqueue" to allow runners to wake up immediately when a file was queued, enabling most runners to sleep indefinitely, keeping overhead on my small Virtual Private Server to a minimum. I offered the patch to the FreeBSD mailman "port" maintainer, but they only expressed interest in a mailman3 patch. Well, I've recently/finally switched over to mailman v3, and I just did what I hope is a cleaner version of the change. I didn't see a PyPI library with cross platform support that didn't want to usurp the main loop, and since I already had the kqueue code, I wrote the simplest possible "DirWatcher" class family. The change aims to "first, do no harm": it will always fall back to using sleep. It will also use the Linux "inotify" facility using the PyPI inotify_simple package (if installed), which I debugged "in the lab". NOTE!! I'm almost certainly seeing signal handling problems when I try to shut down mailman (under FreeBSD), so the changes should not be considered ready for production in any way! I'm not a zope interfaces user, nor a student of mailman project conventions, so again, I'm not asserting the code, as is, is ready for integration. I AM interested in knowing if there's any interest in this, before I bother setting myself up on gitlab, and bringing the change forward from 3.3.1. Something I consider an open issue is that right now I've changed the default sleep_time in schema.cfg from 1 minute to 15 minutes, which is FAR less than ideal if the code falls back to (blind) sleep between directory scans. I considered looking to see if the current instance's class had overridden _do_periodic from the base Runner class, and if not, sleeping indefinitely (or at least a "long time"), but that seemed like a hack, and for testing, it was easier to not write any ugly code, and just wack schema.cfg. Since the changes are small, I'm including them in-line below. ==== core/dirwatch.py """ Simple O/S independent Class to sleep until a single directory changes (written for Mailman) Phil Budne <phil@regressive.org> August 5, 2021 """ import errno # kqueue import os # kqueue import select # kqueue import time # kqueue, base from mailman.interfaces.dirwatch import IDirWatcher from public import public from zope.interface import implementer _alternatives = [] # list of classes to try DEBUG = False # TEMP!!! @public @implementer(IDirWatcher) class BaseDirWatcher: """See `IDirWatch`.""" USE = True def __init__(self, directory): """Create a dirwatch object. :param directory: The directory to watch. """ self.directory = directory # TEMP!!! if DEBUG: self.log = open("/tmp/dirwatch.log", "a") def watch(self, float_sec): """See `IDirWatcher`.""" self.debug("BaseDirWatcher sleeping %s", float_sec) # TEMP time.sleep(float_sec) def debug(self, format, *args): # TEMP!!! if DEBUG: self.log.write(("%s [%d] " + format + "\n") % ((time.time(), os.getpid(),) + args)) self.log.flush() @public @implementer(IDirWatcher) class KQueueDirWatcher(BaseDirWatcher): def __init__(self, directory): super().__init__(directory) self._kq = self._kq_dirfd = None self._kq = select.kqueue() self._kq_dirfd = os.open(self.directory, os.O_RDONLY) ev = select.kevent(self._kq_dirfd, filter=select.KQ_FILTER_VNODE, flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR, fflags=select.KQ_NOTE_WRITE) # install ev, return no events, don't sleep self._kq.control([ev], 0, 0) def watch(self, float_sec): self.debug("kq.watch %s %s", self.USE, self._kq) # TEMP if self.USE and self._kq: try: self.debug('calling kq.control wait=%s', float_sec) # TEMP # no changes to events, return at most 10 events # (should only ever return 1?) events = self._kq.control([], 10, float_sec) self.debug('kq.control returned %d event(s)', len(events)) # TEMP # ignore returned events return except KeyboardInterrupt: raise except OSError as exc: # suppress EINTR, like time.sleep if exc.errno == errno.EINTR: return self.debug('kq.control exception: %s', exc) # TEMP super().sleep(float_sec) _alternatives.append(KQueueDirWatcher) # Linux inotify not an included battery. # Ubuntu (and debian?) have pyinotify (old/unmaintained?) which wants # to take over the main loop, and do callbacks. If one felt # strongly about it, one could create a pyinotify.Notifier # object with a timeout value set, create an event handler that # sets a bool in the DirWatcher object, and have watch call # {check,read,process}_events (or something like that). But # inotify_simple is so much .... simpler. Since determining # what's available happens at run time, you could have BOTH! try: import inotify_simple @public @implementer(IDirWatcher) class INotifySimpleDirWatcher(BaseDirWatcher): def __init__(self, directory): super().__init__(directory) self.inotify = inotify_simple.INotify() # just look for renames self.inotify.add_watch(directory, inotify_simple.masks.MOVE) self.debug('inotify_simple init succeeded') # TEMP def watch(self, float_seconds): self.debug("inotify.watch %s %s", self.USE, self.inotify) # TEMP if self.USE and self.inotify: try: events = self.inotify.read(int(float_seconds*1000 + 0.5)) self.debug("inotify_simple read returned %d", len(events)) # TEMP return except KeyboardInterrupt: raise except OSError as exc: print(exc) # suppress EINTR, like time.sleep if exc.errno == errno.EINTR: return except Exception as exc: pass super().watch(float_seconds) _alternatives.append(INotifySimpleDirWatcher) except: pass def factory(directory): for dw_class in _alternatives: if dw_class.USE: try: return dw_class(directory) except: self.debug("%s init failed", dw_class) # TEMP dw_class.USE = False return BaseDirWatcher(directory) if __name__ == '__main__': watcher = factory('.') watcher.debug("got %s", watcher) # TEMP # enough time to type "touch foo" (and "mv foo bar" on Linux) watcher.watch(20.0) ==== interfaces/dirwatch.py """Interface for directory watchers.""" from public import public from zope.interface import Interface @public class IDirWatcher(Interface): """The directory watcher.""" def watch(self, float_sec): """Watch for new files in directory, for at most float_sec seconds. No value returned. """ ==== diffs --- config/schema.cfg-mm3.3.1 2021-07-07 17:24:59.000000000 -0400 +++ config/schema.cfg 2021-08-05 20:53:06.552912000 -0400 @@ -256,7 +256,9 @@ # The sleep interval for the runner. It wakes up once every interval to # process the files in its slice of the queue directory. Some runners may # ignore this. -sleep_time: 1s +# PLB: Using dirwatch, don't need to poll. unless runner has +# _do_periodic work. +sleep_time: 15m [database] --- core/runner.py-mm3.3.1 2020-04-10 01:53:06.000000000 -0400 +++ core/runner.py 2021-08-05 17:36:41.769775000 -0400 @@ -293,7 +293,10 @@ """See `IRunner`.""" if filecnt or self.sleep_float <= 0: return - time.sleep(self.sleep_float) + if self.switchboard: + self.switchboard.snooze(self.sleep_float) + else: + time.sleep(self.sleep_float) def _short_circuit(self): """See `IRunner`.""" --- interfaces/switchboard.py-mm3.3.1 2020-04-10 01:53:06.000000000 -0400 +++ interfaces/switchboard.py 2021-08-05 17:47:46.712414000 -0400 @@ -84,3 +84,6 @@ time, so moving them is enough to ensure that a normal dequeing operation will handle them. """ + + def snooze(self, sleep_float): + """Wait at most sleep_float seconds for a change to the queue."""
participants (1)
-
Phil Budne