bpo-34837: Multiprocessing.Pool API Extension - Pass Data to Workers w/o Globals

I am proposing an extension to the multiprocessing.Pool API that allows for an alternative way to pass data to Pool worker processes, *without* using globals. A PR has been opened <https://github.com/python/cpython/pull/9627>, extensive test coverage is also included, with all tests & CI passing on github. Please see this blog post <https://thelaziestprogrammer.com/python/multiprocessing-pool-expect-initret-...> for details, motivation, and use cases of the API extension before reading on. In *short*, the implementation of the feature works as follows: 1. Exposes a kwarg on Pool.__init__ called `expect_initret`, that defaults to False. When set to True: 1. Capture the return value of the initializer kwarg of Pool 2. Pass this value to the function being applied, as a kwarg. Again, in *short,* the motivation of the feature is to provide an explicit "flow of data" from parent process to worker process, and to avoid being *forced* to using the *global* keyword in initializer, or being *forced* to create global variables in the parent process. The interface is 100% backwards compatible through Python3.x (and perhaps beyond).

Hi, On Fri, 28 Sep 2018 17:07:33 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Thanks for taking the time to explain your use case and write a proposal. My reactions to this are: 1. The proposed API is ugly. This basically allows you to pass an argument which changes with which arguments another function is later called... 2. A global variable seems like the adequate way to represent a process-global object (which is exactly your use case). 3. If you don't like globals, you could probably do something like lazily-initialize the resource when a function needing it is executed; this also avoids creating the resource if the child doesn't use it at all. Would that work for you? As a more general remark, I understand the desire to make the Pool object more flexible, but we can also not pile up features until it satisfies all use cases. As another general remark, concurrent.futures is IMHO the preferred API for the future, and where feature work should probably concentrate. Regards Antoine.

Hi Antoine - see inline below for my response...thanks for your time! On Fri, Sep 28, 2018 at 6:45 PM Antoine Pitrou <solipsis@pitrou.net> wrote:
2. A global variable seems like the adequate way to represent a
process-global object (which is exactly your use case)
3. If you don't like globals, you could probably do something like them for every Pool use case. Further, if initializing the resource is expensive, we only want to do this ONE time per worker process. So no, this will not ~always~ work.

Hi Sean, On Fri, 28 Sep 2018 19:23:06 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
IMHO, global variables don't break encapsulation if they remain private to the module where they actually play a role. Of course, there are also global-like alternatives to globals, such as class attributes... The multiprocessing module itself uses globals (or quasi-globals) internally for various implementation details.
That's what I meant with lazy initialization: initialize it if not already done, otherwise just use the already-initialized resource. It's a common pattern. (you can view it as a 1-element cache if you prefer)
Hmm... We might have a disagreement on the target audience of the multiprocessing module. multiprocessing isn't very high-level, I would expect it to be used by experienced programmers who know how to mutate a global variable from a lexical scope. For non-programmer end-users, such as data scientists, there are higher-level libraries such as Celery (http://www.celeryproject.org/) and Dask distributed (https://distributed.readthedocs.io/en/latest/). Perhaps it would be worth mentioning them in the documentation. Regards Antoine.

On Sat, Sep 29, 2018 at 6:24 AM Antoine Pitrou <solipsis@pitrou.net> wrote:
Yes, class attributes are a viable alternative. I've written about
this here. <https://thelaziestprogrammer.com/python/multiprocessing-pool-a-global-soluti...> Still, the argument is not against global variables, class attributes or any close cousins -- it is simply that developers shouldn't be forced to use these.
It is one thing to MUTATE a global from a lexical scope. No gripes
there. The specific concept I'm referencing here, is "DECLARING a global variable, from within a lexical scope". This is not as a intuitive for most programmers.

On Sat, 29 Sep 2018 08:13:19 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Well, you don't have to. You can bind it to None in the top-level scope and then mutate it from the lexical scope: my_resource = None def do_work(): global my_resource my_resource = ... Regards Antoine.

On Fri, Sep 28, 2018 at 2:11 PM Sean Harrington <seanharr11@gmail.com> wrote:
The parameter name you chose, "initret" is awkward, because nowhere else in Python does an initializer return a value. Initializers mutate an encapsulated scope. For a class __init__, that scope is an instance's attributes. For a subprocess managed by Pool, that encapsulated scope is its "globals". I'm using quotes to emphasize that these "globals" aren't shared. On Fri, Sep 28, 2018 at 4:39 PM Sean Harrington <seanharr11@gmail.com> wrote:
We must have a different concept of "lazily-initialize". I understood Antoine's suggestion to be a one-time initialize per worker process. On Fri, Sep 28, 2018 at 4:39 PM Sean Harrington <seanharr11@gmail.com> wrote:
My simple argument is that the developer should not be constrained to make the objects passed globally available in the process, as this MAY break encapsulation for large projects.
I could imagine someone switching from Pool to ThreadPool and getting into trouble, but in my mind using threads is caveat emptor. Are you worried about breaking encapsulation in a different scenario?

On Sat, Sep 29, 2018 at 5:24 AM Sean Harrington <seanharr11@gmail.com> wrote:
Echoing Antoine: If you want some functions to not have access to a module's globals, you can put those functions in a different module. Note that multiprocessing already encapsulates each subprocesses' globals in essentially a separate namespace. Without a specific example, this discussion is going to go around in circles. You have a clear aversion to globals. Antoine and I do not. No one else seems to have found this conversation interesting enough to participate, yet.

Hi guys - The solution to "lazily initialize" an expensive object in the worker process (i.e. via @lru_cache) is a great solution (that I must admit I did not think of). Additionally, in the second use case of "*passing a large object to each worker process*", I also agree with your suggestion to "shelter functions in a different module to avoid exposure to globals" as a good solution if one is wary of globals. That said, I still think "*passing a large object from parent process to worker processes*" should be easier when using Pool. Would either of you be open to something like the following? def func(x, big_cache=None): return big_cache[x] big_cache = { str(k): k for k in range(10000) } ls = [ i for i in range(1000) ] with Pool(func_kwargs={"big_cache": big_cache}) as pool: pool.map(func, ls) It's a much cleaner interface (which presumably requires a more difficult implementation) than my initial proposal. This also reads a lot better than the "initializer + global" recipe (clear flow of data), and is less constraining than the "define globals in parent" recipe. Most importantly, when taking sequential code and parallelizing via Pool.map, this does not force the user to re-implement "func" such that it consumes a global (rather than a kwarg). It allows "func" to be used elsewhere (i.e. in the parent process, from a different module, testing w/o globals, etc...).. This would essentially be an efficient implementation of Pool.starmap(), where kwargs are static, and passed to each application of "func" over our iterable. Thoughts? On Sat, Sep 29, 2018 at 3:00 PM Michael Selik <mike@selik.org> wrote:

Starmap will serialize/deserialize the “big object” once for each task created, so this is not performant. The goal is to pay the “one time cost” of serialization of the “big object”, and still pass this object to func at each iteration. On Thu, Oct 4, 2018 at 4:14 AM Michael Selik <mike@selik.org> wrote:

On Wed, Oct 3, 2018 at 6:30 PM, Sean Harrington <seanharr11@gmail.com> wrote:
with Pool(func_kwargs={"big_cache": big_cache}) as pool: pool.map(func, ls)
I feel like it would be nicer to spell this: with Pool() as pool: pool.map(functools.partial(func, big_cache=big_cache), ls) And this might also solve your problem, if pool.map is clever enough to only send the function object once to each worker? -n -- Nathaniel J. Smith -- https://vorpus.org

Hi Nathaniel - this if this solution can be made performant, than I would be more than satisfied. I think this would require removing "func" from the "task tuple", and storing the "func" "once per worker" somewhere globally (maybe a class attribute set post-fork?). This also has the beneficial outcome of increasing general performance of Pool.map and friends. I've seen MANY folks across the interwebs doing things like passing instance methods to map, resulting in "big" tasks, and slower-than-sequential parallelized code. Parallelizing "instance methods" by passing them to map, w/o needing to wrangle with staticmethods and globals, would be a GREAT feature! It'd just be as easy as: Pool.map(self.func, ls) What do you think about this idea? This is something I'd be able to take on, assuming I get a few core dev blessings... On Thu, Oct 4, 2018 at 6:15 AM Nathaniel Smith <njs@pobox.com> wrote:

On Fri, 12 Oct 2018 08:33:32 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Well, I'm not sure how it would work, so it's difficult to give an opinion. How do you plan to avoid passing "self"? By caching (by equality? by identity?)? Something else? But what happens if "self" changed value (in the case of a mutable object) in the parent? Do you keep using the stale version in the child? That would break compatibility... Regards Antoine.

The implementation details need to be flushed out, but agnostic of these, do you believe this a valid solution to the initial problem? Do you also see it as a beneficial optimization to Pool, given that we don't need to store funcs/bound-methods/partials on the tasks themselves? The latter concern about "what happens if `self` changed value in the parent" is the same concern as "what happens if `func` changes in the parent?" given the current implementation. This is an assumption that is currently made with Pool.map_async(func, ls). If "func" changes in the parent, there is no communication with the child. So one just needs to be aware that calling "map_async(self.func, ls)" while the state of "self" is changing, will not communicate changes to each worker. The state is frozen when Pool.map is called, just as is the case now. On Fri, Oct 12, 2018 at 9:07 AM Antoine Pitrou <solipsis@pitrou.net> wrote:

Le 12/10/2018 à 15:17, Sean Harrington a écrit :
I'm not sure, TBH. I also think it may be better to leave this to higher levels (for example Dask will intelligently distribute data on workers and let you work with a kind of proxy object in the main process, transfering data only when necessary).
If you cache "self" between pool.map calls, then the question is not "what happens if self changes *during* a map() call" but "what happens if self changes *between* two map() calls"? While the former is intuitively undefined, current users would expect the latter to have a clear answer, which is: the latest version of self when map() is called is taken into account. Regards Antoine.

I would contend that this is much more granular than Dask - this is just an optimization of Pool.map() to avoid redundantly passing the same `func` repeatedly, once per task, to each worker, with the primary goal of eliminating redundant serialization of large-memory-footprinted Callables. This is a different use case than Dask - I don't intend to approach the shared memory or distributed computing realms. And the second call to Pool.map would update the cached "self" as a part of its initialization workflow, s.t. "the latest version of self when map() is called is taken into account". Do you see a difficulty in accomplishing the second behavior? On Fri, Oct 12, 2018 at 9:25 AM Antoine Pitrou <antoine@python.org> wrote:

Le 12/10/2018 à 16:49, Sean Harrington a écrit :
Only if it has changed, then, right? I suspect that would work, but it will break compatibility in some cases (think of a mutable object that hasn't defined equality - so it defaults to identity). It's also introducing a complication in the API which people didn't have to think of before. The fact that you're doing all this in order to eschew global variables for global resources doesn't warm me much to the idea. Unless other core developers are enthusiastic I'm not willing to integrate such a change. Regards Antoine.

On Fri, Oct 12, 2018, 06:09 Antoine Pitrou <solipsis@pitrou.net> wrote:
I was just suggesting that within a single call to Pool.map, it would be reasonable optimization to only send the fn once to each worker. So e.g. if you have 5 workers and 1000 items, you'd only pickle fn 5 times, rather than 1000 times like we do now. I wouldn't want to get any fancier than that with caching data between different map calls or anything. Of course even this may turn out to be too complicated to implement in a reasonable way, since it would require managing some extra state on the workers. But semantically it would be purely an optimization of current semantics. -n

@Nataniel this is what I am suggesting as well. No cacheing - just storing the `fn` on each worker, rather than pickling it for each item in our iterable. As long as we store the `fn` post-fork on the worker process (perhaps as global), subsequent calls to Pool.map shouldn't be effected (referencing Antoine's & Michael's points that "multiprocessing encapsulates each subprocesses globals in a separate namespace"). @Antoine - I'm making an effort to take everything you've said into consideration here. My initial PR and talk <https://www.youtube.com/watch?v=DH0JVSXvxu0> was intended to shed light on a couple of pitfalls that I often see Python end-users encounter with Pool. Moving beyond my naive first attempt, and the onslaught of deserved criticism, it seems that we have an opportunity here: No changes to the interface, just an optimization to reduce the frequency of pickling. Raymond Hettinger may also be interested in this optimization, as he speaks (with great analogies) about different ways you can misuse concurrency in Python <https://www.youtube.com/watch?v=9zinZmE3Ogk>. This would address one of the pitfalls that he outlines: the "size of the serialized/deserialized data". Is this an optimization that either of you would be willing to review, and accept, if I find there is a *reasonable way* to implement it? On Fri, Oct 12, 2018 at 3:40 PM Nathaniel Smith <njs@pobox.com> wrote:

Would this change the other pool method behavior in some way if the user, for whatever reason, mixed techniques? imap_unordered will only block when nexting the generator. If the user mingles nexting that generator with, say, apply_async, could the change you're proposing have some side-effect? On Tue, Oct 16, 2018, 5:09 AM Sean Harrington <seanharr11@gmail.com> wrote:

Is your concern something like the following? with Pool(8) as p: gen = p.imap_unordered(func, ls) first_elem = next(gen) p.apply_async(long_func, x) remaining_elems = [elem for elem in gen] ...here, if we store "func" on each worker Process as a global, and execute this pattern above, we will likely alter state of one of the worker processes s.t. it stores "long_func" in place of the initial "func". So yes, this could break things. *A potential solution*: Replace "func" in the task tuple with an identifier (maybe, *perhaps
This would avoid the weird stateful bug above. We could also do something slightly different to this if folks are averse to the "Pool class-attribute func map" (i.e. averse to globals), and store this map as an Instance Attribute on the Pool object, and wrap the "initializer" func to make the map globally available in the worker via the "global" keyword. One note: this isn't a "cache", it's just a global map which has its keys & values updated *blindly* with every call to Pool.<public_method>. It serves as a way to bypass repeated serialization of functions in Pool, which can be large when bound to big objects (like large class instances, or functools.partial objects). On Tue, Oct 16, 2018 at 9:27 AM Michael Selik <michael.selik@gmail.com> wrote:

If imap_unordered is currently re-pickling and sending func each time it's called on the worker, I have to suspect there was some reason to do that and not cache it after the first call. Rather than assuming that's an opportunity for an optimization, I'd want to be certain it won't have edge case negative effects. On Tue, Oct 16, 2018 at 2:53 PM Sean Harrington <seanharr11@gmail.com> wrote:
My concern was passing the same function (or a function with the same qualname). You're suggesting caching functions and identifying them by qualname to avoid re-pickling a large stateful object that's shoved into the function's defaults or closure. Is that a correct summary? If so, how would the function cache distinguish between two functions with the same name? Would it need to examine the defaults and closure as well? If so, that means it's pickling the second one anyway, so there's no efficiency gain. In [1]: def foo(a): ...: def bar(): ...: print(a) ...: return bar In [2]: f = foo(1) In [3]: g = foo(2) In [4]: f Out[4]: <function __main__.foo.<locals>.bar()> In [5]: g Out[5]: <function __main__.foo.<locals>.bar()> If we say pool.apply_async(f) and pool.apply_async(g), would you want the latter one to avoid serialization, letting the worker make a second call with the first function object?

You have correctly identified the summary of my intentions, and I agree with your reasoning & concern - however there is a somewhat reasonable answer as to why this optimization has never been implemented: In Pool, the `task` tuple consists of (result_job, func, (x,), {}) . This is the object that is serialized/deserialized b/t processes. The only thing we really care about here is the tuple `(x,)`, confusingly, not `func` (func is ACTUALLY either mapstar() or starmapstar(), which is called with (x,) as its *args). Our element of interest is `(x,)` - a tuple of (func, iterable). Because we need to temper the size of the `iterable` bundled in each task, to avoid de/serialization slowness, we usually end up with multiple tasks per worker, and thus multiple `func`s per worker. Thus, this is really only an optimization in the case of really big functions/closures/partials (or REALLY big iterables with an unreasonably small chunksize passed to map()). The most common use case comes up when passing instance methods (of really big objects!) to Pool.map(). This post <https://thelaziestprogrammer.com/python/a-multiprocessing-pool-pickle#stuck-...> may color in the above with more details. Further, let me pivot on my idea of __qualname__...we can use the `id` of `func` as the cache key to address your concern, and store this `id` on the `task` tuple (i.e. an integer in-lieu of the `func` previously stored there). On Thu, Oct 18, 2018 at 12:49 AM Michael Selik <michael.selik@gmail.com> wrote:

On Thu, Oct 18, 2018 at 8:35 AM Sean Harrington <seanharr11@gmail.com> wrote:
The most common use case comes up when passing instance methods (of really big objects!) to Pool.map().
This reminds me of that old joke: "A patient says to the doctor, 'Doctor, it hurts when I ...!' The doctor replies, 'Well, don't do that.'" Further, let me pivot on my idea of __qualname__...we can use the `id` of
Possible. Does the Pool keep a reference to the passed function in the main process? If not, couldn't the garbage collector free that memory location and a new function could replace it? Then it could have the same qualname and id in CPython. Edge case, for sure. Worse, it'd be hard to reproduce as it'd be dependent on the vagaries of memory allocation.

One idea would be for the Pool method to generate a uuid and slap it on the function as an attribute. If a function being passed in doesn't have one, generate one. If it already has one, just pass that instead of pickling. The child process will keep a cache mapping uuids to functions. I'm still worried about unintended consequences. On Thu, Oct 18, 2018 at 9:00 AM Michael Selik <michael.selik@gmail.com> wrote:

On Thu, Oct 18, 2018 at 9:11 AM Michael Selik <michael.selik@gmail.com> wrote:
I'm not following this thread closely, but I just wanted to point out that __qualname__ won't necessarily be an attribute of the object if the API accepts any callable. (I happen to be following an issue on the tracker where this came up.) --Chris

Hello, I have been working on the concurent.futures module lately and I think this optimization should be avoided in the context of python Pools. This is an interesting idea, however its implementation will bring many complicated issues as it breaks the basic paradigm of a Pool: the tasks are independent and you don't know which worker is going to run which task. The function is serialized with each task because of this paradigm. This ensure that any worker picking the task will be able to perform it independently from the tasks it has run before, given that it as been initialized correctly at the beginning. This makes it simple to run each task. As the Pool comes with no scheduler, with your idea, you would need a synchronization step to send the function to all workers before you can launch your task. But if there is already one worker performing a long running task, does the Pool wait for it to be done before it sends the function? If the Pool doesn't wait, how does it ensure that this worker will be able to get the definition of the function before running it? Also, the multiprocessing.Pool has some features where a worker can shut itself down after a given number of tasks or a timeout. How does it ensure that the new worker will have the definition of the function? It is unsafe to try such a feature (sending only once an object) anywhere else than in the initializer which is guaranteed to be run once per worker. On the other hand, you mentioned an interesting point being that making globals available in the workers could be made simpler. A possible solution would be to add a "globals" argument in the Pool which would instanciate global variables in the workers. I have no specific idea but on the implementation of such features but it would be safer as it would be an initialization feature. Regards, Thomas Moreau On Thu, Oct 18, 2018, 22:20 Chris Jerdonek <chris.jerdonek@gmail.com> wrote:

On Fri, Oct 19, 2018 at 9:09 AM Thomas Moreau <thomas.moreau.2010@gmail.com> wrote:
I would not mind if there would be a subtype of Pool where you can only apply one kind of task to. This is a very common use mode. Though the question there is 'should this live in Python itself'? I'd be fine with a package on PyPi. As the Pool comes with no scheduler, with your idea, you would need a
Would this also mean one could use a Pool in a context where threading is used? Currently using threading side effects unpicklables into the globals. Also being able to pass in globals=None would be optimal for a lot of use cases. -- Joni Orponen

On Fri, Oct 19, 2018 at 7:32 AM Joni Orponen <j.orponen@4teamwork.ch> wrote:
Though the question there is 'should this live in Python itself'? I'd be
fine with a package on PyPi.
Thomas makes a good point: despite the common user mode of calling Pool.map() once, blocking, and returning, the need for serialization of functions within tasks arises when calling Pool.map() (and friends) while workers are still running (i.e. there was a previous call to Pool.async_map()). However this is an uncommon user mode, as Joni points out. The most common user mode is that your Pool workers are only ever executing one type of task at a given time. This opens up optimization opportunities, so long as we store state on the subclassed Pool object of whether or not a SingleTask is running, or has been completed(SingleTaskPool?), to prevent the user from getting in this funky state above. I would rather see this included in the multiprocessing stdlib, as it seemingly will not introduce a lot of new code, would benefit from existing tests. Most importantly it optimizes (in my opinion) the most common user mode of Pool.
We could do this - but we can easily get the same behavior by declaring a "global" in "initializer" (albeit a pattern which I do not like). I like the idea to extend the Pool class a bit more, but this is also my opinion.

On Fri, Oct 19, 2018 at 5:01 AM Sean Harrington <seanharr11@gmail.com> wrote:
I like the idea to extend the Pool class [to optimize the case when only one function is passed to the workers].
Why would this keep the same interface as the Pool class? If its workers are restricted to calling only one function, that should be passed into the Pool constructor. The map and apply methods would then only receive that function's args and not the function itself. You're also trying to avoid the initializer/globals pattern, so you could eliminate that parameter from the Pool constructor. In fact, it sounds more like you'd want a function than a class. You can call it "procmap" or similar. That's code I've written more than once. results = poolmap(func, iterable, processes=cpu_count()) The nuance is that, since there's no explicit context manager, you'll want to ensure the pool is shut down after all the tasks are finished, even if the results generator hasn't been fully consumed.

Michael - the initializer/globals pattern still might be necessary if you need to create an object AFTER a worker process has been instantiated (i.e. a database connection). Further, the user may want to access all of the niceties of Pool, like imap, imap_unordered, etc. The goal (IMO) would be to preserve an interface which many Python users have grown accustomed to, and to allow them to access this optimization out-of-the-bag. Having talked to folks at the Boston Python meetup, folks on my dev team, and perusing stack overflow, this "instance method parallelization" is a pretty common pattern that is often times a negative return on investment for the developer, due to the implicit implementation detail of pickling the function (and object) once per task. Is anyone open to reviewing a PR concerning this optimization of Pool, delivered as a subclass? This feature restricts the number of unique tasks being executed by workers at once to 1, while allowing aggressive subprocess-level function cacheing to prevent repeated serialization/deserialization of large functions/closures. The use case is s.t. the user only ever needs 1 call to Pool.map(func, ls) (or friends) executing at once, when `func` has a non-trivial memory footprint. On Fri, Oct 19, 2018 at 4:06 PM Michael Selik <mike@selik.org> wrote:

This thread seems more appropriate for python-ideas than python-dev. On Mon, Oct 22, 2018 at 5:28 AM Sean Harrington <seanharr11@gmail.com> wrote:
You said you wanted to avoid the initializer/globals pattern and have such things as database connections in the defaults or closure of the task-function, or the bound instance, no? Did I misunderstand? Further, the user may want to access all of the niceties of Pool, like
You just said that the dominant use-case was mapping a single task-function. It sounds like we're talking past each other in some way. It'll help to have a concrete example of a case that satisfies all the characteristics you've described: (1) no globals used for communication between initializer and task-functions; (2) single task-function, mapped once; (3) an instance-method as task-function, causing a large serialization burden; and (4) did I miss anything?
I believe you.
You're quite eager to have this PR merged. I understand that. However, it's reasonable to take some time to discuss the design of what you're proposing. You don't need it in the stdlib to get your own work done, nor to share it with others.

On Mon, Oct 22, 2018 at 2:01 PM Michael Selik <mike@selik.org> wrote:
This thread seems more appropriate for python-ideas than python-dev.
You're right, it's really only use cases (2) and (3) that define this spec. However, the case for subclassing really boils down to the "free" inheritance of the public methods of Pool (map, imap, imap_unordered, etc...). Why exclude these (by implementing "procmap()") if we get this great return with such little investment?
I am just eager to solve this problem, which is likely evident, given that this is the 3rd different implementation discussed in detail since my initial PR. If the group consensus is that this is best implemented via "procmap" function in github gist, then the idea will live there, and likely have a lonely life there. I contend that multiprocessing.Pool is used most frequently with a single task. I am proposing a feature that enforces this invariant, optimizes task memory-footprints & thus serialization time, and preserves the well-established interface to Pool through subclassing.

Hi, On Fri, 28 Sep 2018 17:07:33 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Thanks for taking the time to explain your use case and write a proposal. My reactions to this are: 1. The proposed API is ugly. This basically allows you to pass an argument which changes with which arguments another function is later called... 2. A global variable seems like the adequate way to represent a process-global object (which is exactly your use case). 3. If you don't like globals, you could probably do something like lazily-initialize the resource when a function needing it is executed; this also avoids creating the resource if the child doesn't use it at all. Would that work for you? As a more general remark, I understand the desire to make the Pool object more flexible, but we can also not pile up features until it satisfies all use cases. As another general remark, concurrent.futures is IMHO the preferred API for the future, and where feature work should probably concentrate. Regards Antoine.

Hi Antoine - see inline below for my response...thanks for your time! On Fri, Sep 28, 2018 at 6:45 PM Antoine Pitrou <solipsis@pitrou.net> wrote:
2. A global variable seems like the adequate way to represent a
process-global object (which is exactly your use case)
3. If you don't like globals, you could probably do something like them for every Pool use case. Further, if initializing the resource is expensive, we only want to do this ONE time per worker process. So no, this will not ~always~ work.

Hi Sean, On Fri, 28 Sep 2018 19:23:06 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
IMHO, global variables don't break encapsulation if they remain private to the module where they actually play a role. Of course, there are also global-like alternatives to globals, such as class attributes... The multiprocessing module itself uses globals (or quasi-globals) internally for various implementation details.
That's what I meant with lazy initialization: initialize it if not already done, otherwise just use the already-initialized resource. It's a common pattern. (you can view it as a 1-element cache if you prefer)
Hmm... We might have a disagreement on the target audience of the multiprocessing module. multiprocessing isn't very high-level, I would expect it to be used by experienced programmers who know how to mutate a global variable from a lexical scope. For non-programmer end-users, such as data scientists, there are higher-level libraries such as Celery (http://www.celeryproject.org/) and Dask distributed (https://distributed.readthedocs.io/en/latest/). Perhaps it would be worth mentioning them in the documentation. Regards Antoine.

On Sat, Sep 29, 2018 at 6:24 AM Antoine Pitrou <solipsis@pitrou.net> wrote:
Yes, class attributes are a viable alternative. I've written about
this here. <https://thelaziestprogrammer.com/python/multiprocessing-pool-a-global-soluti...> Still, the argument is not against global variables, class attributes or any close cousins -- it is simply that developers shouldn't be forced to use these.
It is one thing to MUTATE a global from a lexical scope. No gripes
there. The specific concept I'm referencing here, is "DECLARING a global variable, from within a lexical scope". This is not as a intuitive for most programmers.

On Sat, 29 Sep 2018 08:13:19 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Well, you don't have to. You can bind it to None in the top-level scope and then mutate it from the lexical scope: my_resource = None def do_work(): global my_resource my_resource = ... Regards Antoine.

On Fri, Sep 28, 2018 at 2:11 PM Sean Harrington <seanharr11@gmail.com> wrote:
The parameter name you chose, "initret" is awkward, because nowhere else in Python does an initializer return a value. Initializers mutate an encapsulated scope. For a class __init__, that scope is an instance's attributes. For a subprocess managed by Pool, that encapsulated scope is its "globals". I'm using quotes to emphasize that these "globals" aren't shared. On Fri, Sep 28, 2018 at 4:39 PM Sean Harrington <seanharr11@gmail.com> wrote:
We must have a different concept of "lazily-initialize". I understood Antoine's suggestion to be a one-time initialize per worker process. On Fri, Sep 28, 2018 at 4:39 PM Sean Harrington <seanharr11@gmail.com> wrote:
My simple argument is that the developer should not be constrained to make the objects passed globally available in the process, as this MAY break encapsulation for large projects.
I could imagine someone switching from Pool to ThreadPool and getting into trouble, but in my mind using threads is caveat emptor. Are you worried about breaking encapsulation in a different scenario?

On Sat, Sep 29, 2018 at 5:24 AM Sean Harrington <seanharr11@gmail.com> wrote:
Echoing Antoine: If you want some functions to not have access to a module's globals, you can put those functions in a different module. Note that multiprocessing already encapsulates each subprocesses' globals in essentially a separate namespace. Without a specific example, this discussion is going to go around in circles. You have a clear aversion to globals. Antoine and I do not. No one else seems to have found this conversation interesting enough to participate, yet.

Hi guys - The solution to "lazily initialize" an expensive object in the worker process (i.e. via @lru_cache) is a great solution (that I must admit I did not think of). Additionally, in the second use case of "*passing a large object to each worker process*", I also agree with your suggestion to "shelter functions in a different module to avoid exposure to globals" as a good solution if one is wary of globals. That said, I still think "*passing a large object from parent process to worker processes*" should be easier when using Pool. Would either of you be open to something like the following? def func(x, big_cache=None): return big_cache[x] big_cache = { str(k): k for k in range(10000) } ls = [ i for i in range(1000) ] with Pool(func_kwargs={"big_cache": big_cache}) as pool: pool.map(func, ls) It's a much cleaner interface (which presumably requires a more difficult implementation) than my initial proposal. This also reads a lot better than the "initializer + global" recipe (clear flow of data), and is less constraining than the "define globals in parent" recipe. Most importantly, when taking sequential code and parallelizing via Pool.map, this does not force the user to re-implement "func" such that it consumes a global (rather than a kwarg). It allows "func" to be used elsewhere (i.e. in the parent process, from a different module, testing w/o globals, etc...).. This would essentially be an efficient implementation of Pool.starmap(), where kwargs are static, and passed to each application of "func" over our iterable. Thoughts? On Sat, Sep 29, 2018 at 3:00 PM Michael Selik <mike@selik.org> wrote:

Starmap will serialize/deserialize the “big object” once for each task created, so this is not performant. The goal is to pay the “one time cost” of serialization of the “big object”, and still pass this object to func at each iteration. On Thu, Oct 4, 2018 at 4:14 AM Michael Selik <mike@selik.org> wrote:

On Wed, Oct 3, 2018 at 6:30 PM, Sean Harrington <seanharr11@gmail.com> wrote:
with Pool(func_kwargs={"big_cache": big_cache}) as pool: pool.map(func, ls)
I feel like it would be nicer to spell this: with Pool() as pool: pool.map(functools.partial(func, big_cache=big_cache), ls) And this might also solve your problem, if pool.map is clever enough to only send the function object once to each worker? -n -- Nathaniel J. Smith -- https://vorpus.org

Hi Nathaniel - this if this solution can be made performant, than I would be more than satisfied. I think this would require removing "func" from the "task tuple", and storing the "func" "once per worker" somewhere globally (maybe a class attribute set post-fork?). This also has the beneficial outcome of increasing general performance of Pool.map and friends. I've seen MANY folks across the interwebs doing things like passing instance methods to map, resulting in "big" tasks, and slower-than-sequential parallelized code. Parallelizing "instance methods" by passing them to map, w/o needing to wrangle with staticmethods and globals, would be a GREAT feature! It'd just be as easy as: Pool.map(self.func, ls) What do you think about this idea? This is something I'd be able to take on, assuming I get a few core dev blessings... On Thu, Oct 4, 2018 at 6:15 AM Nathaniel Smith <njs@pobox.com> wrote:

On Fri, 12 Oct 2018 08:33:32 -0400 Sean Harrington <seanharr11@gmail.com> wrote:
Well, I'm not sure how it would work, so it's difficult to give an opinion. How do you plan to avoid passing "self"? By caching (by equality? by identity?)? Something else? But what happens if "self" changed value (in the case of a mutable object) in the parent? Do you keep using the stale version in the child? That would break compatibility... Regards Antoine.

The implementation details need to be flushed out, but agnostic of these, do you believe this a valid solution to the initial problem? Do you also see it as a beneficial optimization to Pool, given that we don't need to store funcs/bound-methods/partials on the tasks themselves? The latter concern about "what happens if `self` changed value in the parent" is the same concern as "what happens if `func` changes in the parent?" given the current implementation. This is an assumption that is currently made with Pool.map_async(func, ls). If "func" changes in the parent, there is no communication with the child. So one just needs to be aware that calling "map_async(self.func, ls)" while the state of "self" is changing, will not communicate changes to each worker. The state is frozen when Pool.map is called, just as is the case now. On Fri, Oct 12, 2018 at 9:07 AM Antoine Pitrou <solipsis@pitrou.net> wrote:

Le 12/10/2018 à 15:17, Sean Harrington a écrit :
I'm not sure, TBH. I also think it may be better to leave this to higher levels (for example Dask will intelligently distribute data on workers and let you work with a kind of proxy object in the main process, transfering data only when necessary).
If you cache "self" between pool.map calls, then the question is not "what happens if self changes *during* a map() call" but "what happens if self changes *between* two map() calls"? While the former is intuitively undefined, current users would expect the latter to have a clear answer, which is: the latest version of self when map() is called is taken into account. Regards Antoine.

I would contend that this is much more granular than Dask - this is just an optimization of Pool.map() to avoid redundantly passing the same `func` repeatedly, once per task, to each worker, with the primary goal of eliminating redundant serialization of large-memory-footprinted Callables. This is a different use case than Dask - I don't intend to approach the shared memory or distributed computing realms. And the second call to Pool.map would update the cached "self" as a part of its initialization workflow, s.t. "the latest version of self when map() is called is taken into account". Do you see a difficulty in accomplishing the second behavior? On Fri, Oct 12, 2018 at 9:25 AM Antoine Pitrou <antoine@python.org> wrote:

Le 12/10/2018 à 16:49, Sean Harrington a écrit :
Only if it has changed, then, right? I suspect that would work, but it will break compatibility in some cases (think of a mutable object that hasn't defined equality - so it defaults to identity). It's also introducing a complication in the API which people didn't have to think of before. The fact that you're doing all this in order to eschew global variables for global resources doesn't warm me much to the idea. Unless other core developers are enthusiastic I'm not willing to integrate such a change. Regards Antoine.

On Fri, Oct 12, 2018, 06:09 Antoine Pitrou <solipsis@pitrou.net> wrote:
I was just suggesting that within a single call to Pool.map, it would be reasonable optimization to only send the fn once to each worker. So e.g. if you have 5 workers and 1000 items, you'd only pickle fn 5 times, rather than 1000 times like we do now. I wouldn't want to get any fancier than that with caching data between different map calls or anything. Of course even this may turn out to be too complicated to implement in a reasonable way, since it would require managing some extra state on the workers. But semantically it would be purely an optimization of current semantics. -n

@Nataniel this is what I am suggesting as well. No cacheing - just storing the `fn` on each worker, rather than pickling it for each item in our iterable. As long as we store the `fn` post-fork on the worker process (perhaps as global), subsequent calls to Pool.map shouldn't be effected (referencing Antoine's & Michael's points that "multiprocessing encapsulates each subprocesses globals in a separate namespace"). @Antoine - I'm making an effort to take everything you've said into consideration here. My initial PR and talk <https://www.youtube.com/watch?v=DH0JVSXvxu0> was intended to shed light on a couple of pitfalls that I often see Python end-users encounter with Pool. Moving beyond my naive first attempt, and the onslaught of deserved criticism, it seems that we have an opportunity here: No changes to the interface, just an optimization to reduce the frequency of pickling. Raymond Hettinger may also be interested in this optimization, as he speaks (with great analogies) about different ways you can misuse concurrency in Python <https://www.youtube.com/watch?v=9zinZmE3Ogk>. This would address one of the pitfalls that he outlines: the "size of the serialized/deserialized data". Is this an optimization that either of you would be willing to review, and accept, if I find there is a *reasonable way* to implement it? On Fri, Oct 12, 2018 at 3:40 PM Nathaniel Smith <njs@pobox.com> wrote:

Would this change the other pool method behavior in some way if the user, for whatever reason, mixed techniques? imap_unordered will only block when nexting the generator. If the user mingles nexting that generator with, say, apply_async, could the change you're proposing have some side-effect? On Tue, Oct 16, 2018, 5:09 AM Sean Harrington <seanharr11@gmail.com> wrote:

Is your concern something like the following? with Pool(8) as p: gen = p.imap_unordered(func, ls) first_elem = next(gen) p.apply_async(long_func, x) remaining_elems = [elem for elem in gen] ...here, if we store "func" on each worker Process as a global, and execute this pattern above, we will likely alter state of one of the worker processes s.t. it stores "long_func" in place of the initial "func". So yes, this could break things. *A potential solution*: Replace "func" in the task tuple with an identifier (maybe, *perhaps
This would avoid the weird stateful bug above. We could also do something slightly different to this if folks are averse to the "Pool class-attribute func map" (i.e. averse to globals), and store this map as an Instance Attribute on the Pool object, and wrap the "initializer" func to make the map globally available in the worker via the "global" keyword. One note: this isn't a "cache", it's just a global map which has its keys & values updated *blindly* with every call to Pool.<public_method>. It serves as a way to bypass repeated serialization of functions in Pool, which can be large when bound to big objects (like large class instances, or functools.partial objects). On Tue, Oct 16, 2018 at 9:27 AM Michael Selik <michael.selik@gmail.com> wrote:

If imap_unordered is currently re-pickling and sending func each time it's called on the worker, I have to suspect there was some reason to do that and not cache it after the first call. Rather than assuming that's an opportunity for an optimization, I'd want to be certain it won't have edge case negative effects. On Tue, Oct 16, 2018 at 2:53 PM Sean Harrington <seanharr11@gmail.com> wrote:
My concern was passing the same function (or a function with the same qualname). You're suggesting caching functions and identifying them by qualname to avoid re-pickling a large stateful object that's shoved into the function's defaults or closure. Is that a correct summary? If so, how would the function cache distinguish between two functions with the same name? Would it need to examine the defaults and closure as well? If so, that means it's pickling the second one anyway, so there's no efficiency gain. In [1]: def foo(a): ...: def bar(): ...: print(a) ...: return bar In [2]: f = foo(1) In [3]: g = foo(2) In [4]: f Out[4]: <function __main__.foo.<locals>.bar()> In [5]: g Out[5]: <function __main__.foo.<locals>.bar()> If we say pool.apply_async(f) and pool.apply_async(g), would you want the latter one to avoid serialization, letting the worker make a second call with the first function object?

You have correctly identified the summary of my intentions, and I agree with your reasoning & concern - however there is a somewhat reasonable answer as to why this optimization has never been implemented: In Pool, the `task` tuple consists of (result_job, func, (x,), {}) . This is the object that is serialized/deserialized b/t processes. The only thing we really care about here is the tuple `(x,)`, confusingly, not `func` (func is ACTUALLY either mapstar() or starmapstar(), which is called with (x,) as its *args). Our element of interest is `(x,)` - a tuple of (func, iterable). Because we need to temper the size of the `iterable` bundled in each task, to avoid de/serialization slowness, we usually end up with multiple tasks per worker, and thus multiple `func`s per worker. Thus, this is really only an optimization in the case of really big functions/closures/partials (or REALLY big iterables with an unreasonably small chunksize passed to map()). The most common use case comes up when passing instance methods (of really big objects!) to Pool.map(). This post <https://thelaziestprogrammer.com/python/a-multiprocessing-pool-pickle#stuck-...> may color in the above with more details. Further, let me pivot on my idea of __qualname__...we can use the `id` of `func` as the cache key to address your concern, and store this `id` on the `task` tuple (i.e. an integer in-lieu of the `func` previously stored there). On Thu, Oct 18, 2018 at 12:49 AM Michael Selik <michael.selik@gmail.com> wrote:

On Thu, Oct 18, 2018 at 8:35 AM Sean Harrington <seanharr11@gmail.com> wrote:
The most common use case comes up when passing instance methods (of really big objects!) to Pool.map().
This reminds me of that old joke: "A patient says to the doctor, 'Doctor, it hurts when I ...!' The doctor replies, 'Well, don't do that.'" Further, let me pivot on my idea of __qualname__...we can use the `id` of
Possible. Does the Pool keep a reference to the passed function in the main process? If not, couldn't the garbage collector free that memory location and a new function could replace it? Then it could have the same qualname and id in CPython. Edge case, for sure. Worse, it'd be hard to reproduce as it'd be dependent on the vagaries of memory allocation.

One idea would be for the Pool method to generate a uuid and slap it on the function as an attribute. If a function being passed in doesn't have one, generate one. If it already has one, just pass that instead of pickling. The child process will keep a cache mapping uuids to functions. I'm still worried about unintended consequences. On Thu, Oct 18, 2018 at 9:00 AM Michael Selik <michael.selik@gmail.com> wrote:

On Thu, Oct 18, 2018 at 9:11 AM Michael Selik <michael.selik@gmail.com> wrote:
I'm not following this thread closely, but I just wanted to point out that __qualname__ won't necessarily be an attribute of the object if the API accepts any callable. (I happen to be following an issue on the tracker where this came up.) --Chris

Hello, I have been working on the concurent.futures module lately and I think this optimization should be avoided in the context of python Pools. This is an interesting idea, however its implementation will bring many complicated issues as it breaks the basic paradigm of a Pool: the tasks are independent and you don't know which worker is going to run which task. The function is serialized with each task because of this paradigm. This ensure that any worker picking the task will be able to perform it independently from the tasks it has run before, given that it as been initialized correctly at the beginning. This makes it simple to run each task. As the Pool comes with no scheduler, with your idea, you would need a synchronization step to send the function to all workers before you can launch your task. But if there is already one worker performing a long running task, does the Pool wait for it to be done before it sends the function? If the Pool doesn't wait, how does it ensure that this worker will be able to get the definition of the function before running it? Also, the multiprocessing.Pool has some features where a worker can shut itself down after a given number of tasks or a timeout. How does it ensure that the new worker will have the definition of the function? It is unsafe to try such a feature (sending only once an object) anywhere else than in the initializer which is guaranteed to be run once per worker. On the other hand, you mentioned an interesting point being that making globals available in the workers could be made simpler. A possible solution would be to add a "globals" argument in the Pool which would instanciate global variables in the workers. I have no specific idea but on the implementation of such features but it would be safer as it would be an initialization feature. Regards, Thomas Moreau On Thu, Oct 18, 2018, 22:20 Chris Jerdonek <chris.jerdonek@gmail.com> wrote:

On Fri, Oct 19, 2018 at 9:09 AM Thomas Moreau <thomas.moreau.2010@gmail.com> wrote:
I would not mind if there would be a subtype of Pool where you can only apply one kind of task to. This is a very common use mode. Though the question there is 'should this live in Python itself'? I'd be fine with a package on PyPi. As the Pool comes with no scheduler, with your idea, you would need a
Would this also mean one could use a Pool in a context where threading is used? Currently using threading side effects unpicklables into the globals. Also being able to pass in globals=None would be optimal for a lot of use cases. -- Joni Orponen

On Fri, Oct 19, 2018 at 7:32 AM Joni Orponen <j.orponen@4teamwork.ch> wrote:
Though the question there is 'should this live in Python itself'? I'd be
fine with a package on PyPi.
Thomas makes a good point: despite the common user mode of calling Pool.map() once, blocking, and returning, the need for serialization of functions within tasks arises when calling Pool.map() (and friends) while workers are still running (i.e. there was a previous call to Pool.async_map()). However this is an uncommon user mode, as Joni points out. The most common user mode is that your Pool workers are only ever executing one type of task at a given time. This opens up optimization opportunities, so long as we store state on the subclassed Pool object of whether or not a SingleTask is running, or has been completed(SingleTaskPool?), to prevent the user from getting in this funky state above. I would rather see this included in the multiprocessing stdlib, as it seemingly will not introduce a lot of new code, would benefit from existing tests. Most importantly it optimizes (in my opinion) the most common user mode of Pool.
We could do this - but we can easily get the same behavior by declaring a "global" in "initializer" (albeit a pattern which I do not like). I like the idea to extend the Pool class a bit more, but this is also my opinion.

On Fri, Oct 19, 2018 at 5:01 AM Sean Harrington <seanharr11@gmail.com> wrote:
I like the idea to extend the Pool class [to optimize the case when only one function is passed to the workers].
Why would this keep the same interface as the Pool class? If its workers are restricted to calling only one function, that should be passed into the Pool constructor. The map and apply methods would then only receive that function's args and not the function itself. You're also trying to avoid the initializer/globals pattern, so you could eliminate that parameter from the Pool constructor. In fact, it sounds more like you'd want a function than a class. You can call it "procmap" or similar. That's code I've written more than once. results = poolmap(func, iterable, processes=cpu_count()) The nuance is that, since there's no explicit context manager, you'll want to ensure the pool is shut down after all the tasks are finished, even if the results generator hasn't been fully consumed.

Michael - the initializer/globals pattern still might be necessary if you need to create an object AFTER a worker process has been instantiated (i.e. a database connection). Further, the user may want to access all of the niceties of Pool, like imap, imap_unordered, etc. The goal (IMO) would be to preserve an interface which many Python users have grown accustomed to, and to allow them to access this optimization out-of-the-bag. Having talked to folks at the Boston Python meetup, folks on my dev team, and perusing stack overflow, this "instance method parallelization" is a pretty common pattern that is often times a negative return on investment for the developer, due to the implicit implementation detail of pickling the function (and object) once per task. Is anyone open to reviewing a PR concerning this optimization of Pool, delivered as a subclass? This feature restricts the number of unique tasks being executed by workers at once to 1, while allowing aggressive subprocess-level function cacheing to prevent repeated serialization/deserialization of large functions/closures. The use case is s.t. the user only ever needs 1 call to Pool.map(func, ls) (or friends) executing at once, when `func` has a non-trivial memory footprint. On Fri, Oct 19, 2018 at 4:06 PM Michael Selik <mike@selik.org> wrote:

This thread seems more appropriate for python-ideas than python-dev. On Mon, Oct 22, 2018 at 5:28 AM Sean Harrington <seanharr11@gmail.com> wrote:
You said you wanted to avoid the initializer/globals pattern and have such things as database connections in the defaults or closure of the task-function, or the bound instance, no? Did I misunderstand? Further, the user may want to access all of the niceties of Pool, like
You just said that the dominant use-case was mapping a single task-function. It sounds like we're talking past each other in some way. It'll help to have a concrete example of a case that satisfies all the characteristics you've described: (1) no globals used for communication between initializer and task-functions; (2) single task-function, mapped once; (3) an instance-method as task-function, causing a large serialization burden; and (4) did I miss anything?
I believe you.
You're quite eager to have this PR merged. I understand that. However, it's reasonable to take some time to discuss the design of what you're proposing. You don't need it in the stdlib to get your own work done, nor to share it with others.

On Mon, Oct 22, 2018 at 2:01 PM Michael Selik <mike@selik.org> wrote:
This thread seems more appropriate for python-ideas than python-dev.
You're right, it's really only use cases (2) and (3) that define this spec. However, the case for subclassing really boils down to the "free" inheritance of the public methods of Pool (map, imap, imap_unordered, etc...). Why exclude these (by implementing "procmap()") if we get this great return with such little investment?
I am just eager to solve this problem, which is likely evident, given that this is the 3rd different implementation discussed in detail since my initial PR. If the group consensus is that this is best implemented via "procmap" function in github gist, then the idea will live there, and likely have a lonely life there. I contend that multiprocessing.Pool is used most frequently with a single task. I am proposing a feature that enforces this invariant, optimizes task memory-footprints & thus serialization time, and preserves the well-established interface to Pool through subclassing.
participants (9)
-
Antoine Pitrou
-
Antoine Pitrou
-
Chris Jerdonek
-
Joni Orponen
-
Michael Selik
-
Michael Selik
-
Nathaniel Smith
-
Sean Harrington
-
Thomas Moreau