[IPython-dev] IPython Parallel Context Manager
David Hirschfeld
dave.hirschfeld at gmail.com
Fri Jan 11 10:19:11 EST 2013
I've created a context manger which can be used to instantiate and
shutdown an IPython cluster. I'm posting it here in the hope that
others may find it useful and also for any feedback on how it could be
implemented better, especially in regards to the ugly sleep calls
where I found no better way of waiting until the client had connected
to the instantiated engines.
Demo code below and context manager below that.
HTH,
Dave
```
In [4]: from IPython.utils.path import locate_profile
...: from IPython.parallel.apps.launcher import
LocalControllerLauncher, LocalEngineLauncher
...: from IPython.parallel import Client
In [5]: rc = Client() # No controller running, expect error
Traceback (most recent call last):
File "<ipython-input-5-3adcc54b8411>", line 1, in <module>
rc = Client()
File "c:\dev\code\ipython\IPython\parallel\client\client.py", line
409, in __init__
with open(url_file) as f:
IOError: [Errno 2] No such file or directory:
u'C:\\Users\\dhirschfeld\\.ipython\\profile_default\\security\\ipcontroller-client.json'
In [6]: with ParallelView(nengines=4, start_hub=True) as view:
...: print view.targets
...: #
...:
Starting LocalControllerLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7208
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 4292
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 6320
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8804
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 5712
[0, 1, 2, 3]
In [7]: rc = Client() # Instantiated controller shutdown on exit, expect error
Traceback (most recent call last):
File "<ipython-input-7-3adcc54b8411>", line 1, in <module>
rc = Client()
File "c:\dev\code\ipython\IPython\parallel\client\client.py", line
409, in __init__
with open(url_file) as f:
IOError: [Errno 2] No such file or directory:
u'C:\\Users\\dhirschfeld\\.ipython\\profile_default\\security\\ipcontroller-client.json'
In [8]: controller = LocalControllerLauncher(profile_dir=locate_profile())
...: controller.start()
In [9]: with ParallelView(nengines=4) as view:
...: print view.targets
...: #
...:
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8580
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7524
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 11064
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8144
[0, 1, 2, 3]
In [10]: rc = Client() # Engines only shutdown on exit, expect empty list
...: rc.ids
Out[10]: []
In [11]: engine = LocalEngineLauncher(profile_dir=locate_profile())
...: engine.start()
In [12]: engine = LocalEngineLauncher(profile_dir=locate_profile())
...: engine.start()
In [13]: rc.ids
Out[13]: [4, 5]
In [14]: with ParallelView(nengines=4) as view: # Expect only
instantiated engines in targets
...: print view.targets
...: #
...:
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 6344
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7532
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 10032
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 10236
[8, 9, 6, 7]
In [15]: rc.ids # Instantiated engines shutdown, original engines remain
Out[15]: [4, 5]
In [16]:
```
class ParallelView(object):
def __init__(self, nengines=1,
start_hub=False,
type='load_balanced',
profile='default',
logger=None,
delay=0.1):
"""Context mangager which will return a view [1]_ on an IPython
cluster, optionally instantiating one if it is not already
running.
Parameters
----------
nengines : int
The number of engine instances to instantiate
start_hub : bool
Whether or not to instantiate the controller as well
type : str
Either 'load_balanced' or 'direct'
profile : str
The name of the IPython to use
logger : a `logging.Logger` instance
If None a default logger writing to `sys.stdout` will be used
delay : float
How long between engine instantiations to delay
References
----------
.. [1] http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.view.html#IPython.parallel.client.view.View
"""
import time
from IPython.utils.path import locate_profile
from IPython.config import PyFileConfigLoader
self.rc = None
self.ids = None
self.delay = delay
self.nengines = int(nengines)
self.engines = []
self.type = str(type).lower()
if self.type not in ('load_balanced','direct'):
msg = "Expected `type` to be one of 'load_balanced' or
'direct' but got %s." % type
raise ValueError(msg)
if profile is None:
self.profile_dir = locate_profile()
else:
self.profile_dir = locate_profile(profile=profile)
self.profile = profile
self.config = PyFileConfigLoader('ipython_config.py',
self.profile_dir).load_config()
if logger is None:
import logging
import sys
logger = logging.Logger('ipcluster')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
self.logger = logger
if start_hub:
from IPython.parallel.apps.launcher import LocalControllerLauncher
controller = LocalControllerLauncher(config=self.config,
log=self.logger,
profile_dir=self.profile_dir)
controller.start()
time.sleep(2)
self.start_hub = bool(start_hub)
#
def __enter__(self):
from IPython.parallel.apps.launcher import LocalEngineLauncher
from IPython.parallel import Client
import time
self.rc = Client(profile=self.profile)
self.ids = set(self.rc.ids)
for i in range(self.nengines):
engine = LocalEngineLauncher(config=self.config,
log=self.logger,
profile_dir=self.profile_dir)
self.engines.append(engine)
engine.start()
time.sleep(self.delay)
MAX_LOOPS = 20
while len(set(self.rc.ids).difference(self.ids)) <
sum(e.running for e in self.engines):
time.sleep(10*self.delay)
MAX_LOOPS -= 1
if MAX_LOOPS < 0:
break
self.ids = list(set(self.rc.ids).difference(self.ids))
if self.type == 'load_balanced':
return self.rc.load_balanced_view(self.ids)
return self.rc[self.ids]
#
def __exit__(self, *exc_info):
self.rc.shutdown(self.ids, hub=self.start_hub)
#
More information about the IPython-dev
mailing list