Some time ago I had done a (crude) patch to mailman v2 to use FreeBSD
"kqueue" to allow runners to wake up immediately when a file was
queued, enabling most runners to sleep indefinitely, keeping overhead
on my small Virtual Private Server to a minimum. I offered the patch
to the FreeBSD mailman "port" maintainer, but they only expressed
interest in a mailman3 patch.
Well, I've recently/finally switched over to mailman v3, and I just
did what I hope is a cleaner version of the change. I didn't see a
PyPI library with cross platform support that didn't want to usurp the
main loop, and since I already had the kqueue code, I wrote the
simplest possible "DirWatcher" class family. The change aims to
"first, do no harm": it will always fall back to using sleep.
It will also use the Linux "inotify" facility using the PyPI
inotify_simple package (if installed), which I debugged
"in the lab".
NOTE!! I'm almost certainly seeing signal handling problems when I try
to shut down mailman (under FreeBSD), so the changes should not be
considered ready for production in any way!
I'm not a zope interfaces user, nor a student of mailman project
conventions, so again, I'm not asserting the code, as is, is ready for
integration.
I AM interested in knowing if there's any interest in this, before I
bother setting myself up on gitlab, and bringing the change forward
from 3.3.1.
Something I consider an open issue is that right now I've changed the
default sleep_time in schema.cfg from 1 minute to 15 minutes, which is
FAR less than ideal if the code falls back to (blind) sleep between
directory scans. I considered looking to see if the current
instance's class had overridden _do_periodic from the base Runner
class, and if not, sleeping indefinitely (or at least a "long time"),
but that seemed like a hack, and for testing, it was easier to not
write any ugly code, and just wack schema.cfg.
Since the changes are small, I'm including them in-line below.
==== core/dirwatch.py
"""
Simple O/S independent Class to sleep until a single directory changes
(written for Mailman)
Phil Budne <phil(a)regressive.org>
August 5, 2021
"""
import errno # kqueue
import os # kqueue
import select # kqueue
import time # kqueue, base
from mailman.interfaces.dirwatch import IDirWatcher
from public import public
from zope.interface import implementer
_alternatives = [] # list of classes to try
DEBUG = False # TEMP!!!
@public
@implementer(IDirWatcher)
class BaseDirWatcher:
"""See `IDirWatch`."""
USE = True
def __init__(self, directory):
"""Create a dirwatch object.
:param directory: The directory to watch.
"""
self.directory = directory
# TEMP!!!
if DEBUG:
self.log = open("/tmp/dirwatch.log", "a")
def watch(self, float_sec):
"""See `IDirWatcher`."""
self.debug("BaseDirWatcher sleeping %s", float_sec) # TEMP
time.sleep(float_sec)
def debug(self, format, *args): # TEMP!!!
if DEBUG:
self.log.write(("%s [%d] " + format + "\n") % ((time.time(), os.getpid(),) + args))
self.log.flush()
@public
@implementer(IDirWatcher)
class KQueueDirWatcher(BaseDirWatcher):
def __init__(self, directory):
super().__init__(directory)
self._kq = self._kq_dirfd = None
self._kq = select.kqueue()
self._kq_dirfd = os.open(self.directory, os.O_RDONLY)
ev = select.kevent(self._kq_dirfd,
filter=select.KQ_FILTER_VNODE,
flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR,
fflags=select.KQ_NOTE_WRITE)
# install ev, return no events, don't sleep
self._kq.control([ev], 0, 0)
def watch(self, float_sec):
self.debug("kq.watch %s %s", self.USE, self._kq) # TEMP
if self.USE and self._kq:
try:
self.debug('calling kq.control wait=%s', float_sec) # TEMP
# no changes to events, return at most 10 events
# (should only ever return 1?)
events = self._kq.control([], 10, float_sec)
self.debug('kq.control returned %d event(s)', len(events)) # TEMP
# ignore returned events
return
except KeyboardInterrupt:
raise
except OSError as exc:
# suppress EINTR, like time.sleep
if exc.errno == errno.EINTR:
return
self.debug('kq.control exception: %s', exc) # TEMP
super().sleep(float_sec)
_alternatives.append(KQueueDirWatcher)
# Linux inotify not an included battery.
# Ubuntu (and debian?) have pyinotify (old/unmaintained?) which wants
# to take over the main loop, and do callbacks. If one felt
# strongly about it, one could create a pyinotify.Notifier
# object with a timeout value set, create an event handler that
# sets a bool in the DirWatcher object, and have watch call
# {check,read,process}_events (or something like that). But
# inotify_simple is so much .... simpler. Since determining
# what's available happens at run time, you could have BOTH!
try:
import inotify_simple
@public
@implementer(IDirWatcher)
class INotifySimpleDirWatcher(BaseDirWatcher):
def __init__(self, directory):
super().__init__(directory)
self.inotify = inotify_simple.INotify()
# just look for renames
self.inotify.add_watch(directory, inotify_simple.masks.MOVE)
self.debug('inotify_simple init succeeded') # TEMP
def watch(self, float_seconds):
self.debug("inotify.watch %s %s", self.USE, self.inotify) # TEMP
if self.USE and self.inotify:
try:
events = self.inotify.read(int(float_seconds*1000 + 0.5))
self.debug("inotify_simple read returned %d", len(events)) # TEMP
return
except KeyboardInterrupt:
raise
except OSError as exc:
print(exc)
# suppress EINTR, like time.sleep
if exc.errno == errno.EINTR:
return
except Exception as exc:
pass
super().watch(float_seconds)
_alternatives.append(INotifySimpleDirWatcher)
except:
pass
def factory(directory):
for dw_class in _alternatives:
if dw_class.USE:
try:
return dw_class(directory)
except:
self.debug("%s init failed", dw_class) # TEMP
dw_class.USE = False
return BaseDirWatcher(directory)
if __name__ == '__main__':
watcher = factory('.')
watcher.debug("got %s", watcher) # TEMP
# enough time to type "touch foo" (and "mv foo bar" on Linux)
watcher.watch(20.0)
==== interfaces/dirwatch.py
"""Interface for directory watchers."""
from public import public
from zope.interface import Interface
@public
class IDirWatcher(Interface):
"""The directory watcher."""
def watch(self, float_sec):
"""Watch for new files in directory, for at most float_sec seconds.
No value returned.
"""
==== diffs
--- config/schema.cfg-mm3.3.1 2021-07-07 17:24:59.000000000 -0400
+++ config/schema.cfg 2021-08-05 20:53:06.552912000 -0400
@@ -256,7 +256,9 @@
# The sleep interval for the runner. It wakes up once every interval to
# process the files in its slice of the queue directory. Some runners may
# ignore this.
-sleep_time: 1s
+# PLB: Using dirwatch, don't need to poll. unless runner has
+# _do_periodic work.
+sleep_time: 15m
[database]
--- core/runner.py-mm3.3.1 2020-04-10 01:53:06.000000000 -0400
+++ core/runner.py 2021-08-05 17:36:41.769775000 -0400
@@ -293,7 +293,10 @@
"""See `IRunner`."""
if filecnt or self.sleep_float <= 0:
return
- time.sleep(self.sleep_float)
+ if self.switchboard:
+ self.switchboard.snooze(self.sleep_float)
+ else:
+ time.sleep(self.sleep_float)
def _short_circuit(self):
"""See `IRunner`."""
--- interfaces/switchboard.py-mm3.3.1 2020-04-10 01:53:06.000000000 -0400
+++ interfaces/switchboard.py 2021-08-05 17:47:46.712414000 -0400
@@ -84,3 +84,6 @@
time, so moving them is enough to ensure that a normal dequeing
operation will handle them.
"""
+
+ def snooze(self, sleep_float):
+ """Wait at most sleep_float seconds for a change to the queue."""