Hi folks,
I'd like to get some feedback on a multi-threading interface I've been thinking about and using for the past year or so. I won't bury the lede,
see my approach here.
Background / problem:
A couple of years ago, I inherited my company's codebase to get data into our data warehouse using an ELT approach (extract-and-loads done in python, transforms done in dbt/SQL). The codebase has dozens of python scripts to integrate first-party and third-party data from databases, FTPs, and APIs, which are run on a scheduler (typically daily or hourly). The scripts I inherited were single-threaded procedural scripts, looking like glue code, and spending most of their time in network I/O. (
See example.) This got my company pretty far!
As my team and I added more and more integrations with more and more data, we wanted to have faster and faster scripts to reduce our dev cycles and reduce our multi-hour nightly jobs to minutes. Because our scripts were network-bound, multi-threading was a good way to accomplish this, and so I looked into concurrent.futures (
example) and asyncio (
example), but I decided against these options because:
1. It wasn't immediately apparently how to adapt my codebase to use these libraries without either some fundamental changes to our execution platform and/or reworking of our scripts from the ground up and/or adding significant lines of multi-threading code to each script.
2. I couldn't wrap my head around the async/await and future constructs particularly quickly, and I was concerned that my team would also struggle with this change.
3. I believe the procedural style glue code we have is quite easy to comprehend, which I think has a positive impact on scale.
Solution:
And so, as mentioned at the top, I designed a different interface to concurrent.futures.ThreadPoolExecutor that we are successfully using for our extract-and-load pattern,
see a basic example here. The design considerations of this interface include:
- The usage is minimally-invasive to the original unthreaded approach of the codebase. (And so, teaching the library to team members has been fairly straightforward despite the multi-threaded paradigm shift.)
- The @parallel.task decorator should be used to encapsulate a homogeneous method accepting different parameters. The contents of the method should be primarily I/O to achieve the concurrency gains of python multi-threading.
- If no parallel.threads context manager has been entered, the @parallel.task decorator acts as a no-op (and the code runs serially).
- If an environment variable is set to disable the context manager, the @parallel.task decorator acts as a no-op (and the code runs serially).
- There is also an environment variable to change the number of workers provided by parallel.threads (if not hard-coded).
While it's possible to return a value from a @parallel.task method, I encourage my team to use the decorator to start-and-complete work; think of writing "embarrassingly parallel" methods that can be "mapped".
A couple of other things we've implemented include a "thread barrier" in the case where we want a set tasks to complete before a set of other tasks, and a decorator for factory methods to produce cached thread-local objects (helpful for ensuring thread-safe access to network clients that are not thread-safe).
Your feedback:
- I'd love to hear your thoughts on my problem and solution.
- I've done a bit of research of existing libraries in PyPI and PEPs but I don't see any similar libraries; are you aware of anything?
- What do you suggest I do next? I'm considering publishing it, but could use some tips on what to here!
Thanks!
Sean McIntyre
_______________________________________________