Hello,
I coincidently started my own implementation of a system to manage
intermediate results last week, which I called jug. I wasn't planning to make
such an alpha version public just now, but it seems to be on topic.
The main idea is to use hashes to map function arguments to paths on the
filesystem, which store the result (nothing extraordinary here). I also added
the capability of having tasks (the basic unit) take the results of other
tasks and defining an implicit dependency DAG. A simple locking mechanism
enables light-weight task-level parellization (this was the second of my
goals: help me make my stuff parallel).
A trick that helps is that I don't really use the argument values to hash
(which would be unwieldy for big arrays). I use the computation path (e.g.,
this is the value obtained from f(g('something'),2)). Since, at least in my
problems, things tend to always map back into simple file-system paths, the
hash computation doesn't even need to load the intermediate results.
I will make the git repository publicly available once I figure out how to do
that.
I append the tutorial I wrote, which explains the system.
HTH,
Luís Pedro Coelho
PhD Student in Computational Biology
Carnegie Mellon University
============
Jug Tutorial
============
What is jug?
------------
Jug is a simple way to write easily parallelisable programs in Python. It also
handles intermediate results for you.
Example
-------
This is a simple worked-through example which illustrates what jug does.
Problem
~~~~~~~
Assume that I want to do the following to a collection of images:
(1) for each image, compute some features
(2) cluster these features using k-means. In order to find out the number
of clusters, I try several values and pick the best result. For each value of
k, because of the random initialisation, I run the clustering 10 times.
I could write the following simple code:
::
imgs = glob('*.png')
features = [computefeatures(img,parameter=2) for img in imgs]
clusters = []
bics = []
for k in xrange(2,200):
for repeat in xrange(10):
clusters.append(kmeans(features,k=k,random_seed=repeat))
bics.append(compute_bic(clusters[-1]))
Nr_clusters = argmin(bics) // 10
Very simple and solves the problem. However, if I want to take advantage of
the obvious parallelisation of the problem, then I need to write much more
complicated code. My traditional approach is to break this down into smaller
scripts. I'd have one to compute features for some images, I'd have another to
merge all the results together and do some of the clustering, and, finally, one
to merge all the results of the different clusterings. These would need to be
called with different parameters to explore different areas of the parameter
space, so I'd have a couple of scripts just for calling the main computation
scripts. Intermediate results would be saved and loaded by the different
processes.
This has several problems. The biggest are
(1) The need to manage intermediate files. These are normally files with
long names like *features_for_img_0_with_parameter_P.pp*.
(2) The code gets much more complex.
There are minor issues with having to issue several jobs (and having the
cluster be idle in the meanwhile), or deciding on how to partition the jobs so
that they take roughly the same amount of time, but the two above are the main
ones.
Jug solves all these problems!
Tasks
~~~~~
The main unit of jug is a Task. Any function can be used to generate a Task. A
Task can depend on the results of other Tasks.
The original idea for jug was a Makefile-like environment for declaring Tasks.
I have moved beyond that, but it might help you think about what Tasks are.
You create a Task by giving it a function which performs the work and its
arguments. The arguments can be either literal values or other tasks (in which
case, the function will be called with the *result* of those tasks!). Jug also
understands lists of tasks (all standard Python containers will be supported
in a later version). For example, the following code declares the necessary
tasks for our problem:
::
imgs = glob('*.png')
feature_tasks = [Task(computefeatures,img,parameter=2) for img in imgs]
cluster_tasks = []
bic_tasks = []
for k in xrange(2,200):
for repeat in xrange(10):
cluster_tasks.append(Task(kmeans,feature_tasks,k=k,random_seed=repeat))
bic_tasks.append(Task(compute_bic,cluster_tasks[-1]))
Nr_clusters = Task(argmin,bic_tasks)
Task Generators
~~~~~~~~~~~~~~~
In the code above, there is a lot of code of the form *Task(function,args)*,
so maybe it should read *function(args)*. A simple helper function aids this
process:
::
from jug.task import Task
def TaskGenerator(function):
def gen(*args,**kwargs):
return Task(function,*args,**kwargs)
return gen
computefeatures = TaskGenerator(computefeatures)
kmeans = TaskGenerator(kmeans)
compute_bic = TaskGenerator(compute_bic)
@TaskGenerator
def Nr_Clusters(bics):
return argmin(bics) // 10
imgs = glob('*.png')
features = [computefeatures(img,parameter=2) for img in imgs]
clusters = []
bics = []
for k in xrange(2,200):
for repeat in xrange(10):
clusters.append(kmeans(features,k=k,random_seed=repeat))
bics.append(compute_bic(clusters[-1]))
Nr_clusters(bics)
You can see that this code is almost identical to our original sequential
code, except for the declarations at the top and the fact that *Nr_clusters*
is now a function (actually a TaskGenerator, look at the use of a declarator).
This file is called the jugfile (you should name it *jugfile.py* on the
filesystem) and specifies your problem. Of course, *TaskManager* is already a
part of jug and those first few lines could have read
::
from jug.task import TaskGenerator
Jug
~~~
So far, we have achieved seemingly little. We have turned a simple piece of
sequential code into something that generates Task objects, but does not
actually perform any work. The final piece is jug. Jug takes these Task objects
and runs them. It's main loop is basically
::
while len(tasks) > 0:
for t in tasks:
if can_run(t): # ensures that all dependencies have been run
if need_to_run(t) and not is_running(t):
t.run()
tasks.remove(t)
If you run jug on the script above, you will simply have reproduced the
original code with the added benefit of having all the intermediate results
saved.
The interesting is what happens when you run several instances of jug at the
same time. They will start running Tasks, but each instance will run its own
tasks. This allows you to take advantage of multiple processors in a way that
keeps the processors all occupied as long as there is work to be done, handles
the implicit dependencies, and passes functions the right values. Note also
that, unlike more traditional parallel processing frameworks (like MPI), jug
has no problems with the number of participating processors varying throughout
the job.
Behind the scenes, jug is using the filesystem to both save intermediate
results (which get passed around) and to lock running tasks so that each task
is only run once (the actual main loop is thus a bit more complex than shown
above).
Intermediate and Final Results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can obtain the final results of your computation by setting up a task that
saves them to disk and loading them from there. If the results of your
computation are simple enough, this might be the simplest way.
Another way, which is also the way to access the intermediate results if you
want them, is to run the jug script and then call the *load()* method on
Tasks. For example,
::
img = glob('*.png')
features = [computefeatures(img,parameter=2) for img in imgs]
...
feature_values = [feat.load() for feat in features]
If the values are not accessible, this raises an exception.
Advantages
----------
jug is an attempt to get something that works in the setting that I have found
myself in: code that is *embarissingly parallel* with a couple of points where
all the results of previous processing are merged, often in a simple way. It
is also a way for me to manage either the explosion of temporary files that
plagued my code and the brittleness of making sure that all results from
separate processors are merged correctly in my *ad hoc* scripts.
Limitations
-----------
This is not an attempt to replace MPI in any way. For code that has more merge
points, this won't do. It also won't do if the individual tasks are so small
that the over-head of managing them swamps out the performance gains of
parallelisation. In my code, most of the times, each task takes 20 seconds to
a few minutes. Just enough to make the managing time irrelevant, but fast
enough that the main job can be broken into thousands of tiny pieces.
The system makes it too easy to save all intermediate results and run out of
disk space.
This is still Python, not a true parallel programming language. The
abstraction will sometimes leak through, for example, if you try to pass a
Task to a function which expects a real value. Recall how we had to re-write
the line *Nr_clusters = argmin(bics) // 10* above.
Planned Capabilities
--------------------
Here are a couple of simple improvements I plan to make at some point:
* jug.py cleanup: removes left-over locks, temporary files, and unsused
results.
* Stop & re-start. Currently, jug processes will exit if they can't make
any progress for a while. In the future, I'd like them to be unblockable by
other jug processes.
* No result tasks. Task-like objects that don't save intermediate results.
* Have tasks be passed inside *sets* and *dictionaries*. Maybe even
*numpy* arrays! This will make jug even more like a real parallel programming
language.
* If the original arguments are files on disk, then jug should check their
modification date and invalidate subsequent results.