As explained in the IPython documentation for parallel computing, IPython has built-in support for parallelism. This came as a result of the architectural overhaul that IPython received when the project finished migrating to ZeroMQ in 2011. The architecture that resulted can be summarized with the following components, all of which are present in the IPython.parallel
package:
The following figure shows us the IPython architecture:
Thanks to the aforementioned architecture, IPython supports the following different styles of parallelism:
Practically speaking, this allows the IPython users to tackle the following use cases:
To use IPython for parallel computing, you need to initialize an instance of the controller and one or more instances of the engine. Initially, it is best to simply start a controller and engines on a single host by using the ipcluster
command. To initialize a controller and 4 engines on your localhost, switch to a terminal window with this notebook's virtual environment activated and execute the following command:
$ ipcluster start -n 4
This will run the ipcluster
app in the foreground and show the following output:
2015-04-28 09:50:29.584 [IPClusterStart] Using existing profile dir: '/Users/oubiwann/.ipython/profile_default' 2015-04-28 09:50:29.587 [IPClusterStart] Starting ipcluster with [daemon=False] 2015-04-28 09:50:29.589 [IPClusterStart] Creating pid file: /Users/oubiwann/.ipython/profile_default/pid/ipcluster.pid 2015-04-28 09:50:29.589 [IPClusterStart] Starting Controller with LocalControllerLauncher 2015-04-28 09:50:30.593 [IPClusterStart] Starting 4 Engines with LocalEngineSetLauncher
With an IPython cluster running, we're ready to start using it. There are several ways via which one can interact with an IPython cluster. The connection is made with a client object. However, the client object offers views for an actual interaction with the cluster. The following views are available:
Though not a view per se, the IPython cluster provides magic functions to interact with clusters, thus acting very much like a view. In an IPython Notebook, the parallel magic functions will often be what you want to use. For both the direct and load-balanced views, you will need to create a cluster client, as follows:
In [20]: from IPython.parallel import Client client = Client() client.ids
The client.ids
attribute holds the IDs for each IPython cluster engine that was started by the call to the preceding ipcluster
.
The direct view is called so because the DirectView
object offers an interface to the cluster that doesn't go through the schedulers. Instead, it offers direct, manual execution control.
Here's how you create a direct view from the client
object:
In [21]: view = client.direct_view() view Out[21]: <DirectView all>
Direct views are also available on the client via indices. The all
part in the view
object representation is an oblique reference to the fact that you have not selected some of the direct view instances on the object (such as the view index number or slicing), but have rather asked for all of them.
By default, when executing functions in parallel, an asynchronous result
object is returned, which is demonstrated in the following code:
In [22]: async_result = view.apply(np.random.rand, 2, 2) async_result Out[22]: <AsyncResult: finished>
When the results are ready to be obtained, the result
object representation will provide a clue to this fact by displaying finished
. To get the result values, simply call the get
method, as follows:
In [23]: values = async_result.get() values Out[23]: [array([[ 0.07792881, 0.21319405], [ 0.20925777, 0.74999169]]), array([[ 0.07792881, 0.21319405], [ 0.20925777, 0.74999169]]), array([[ 0.07792881, 0.21319405], [ 0.20925777, 0.74999169]]), array([[ 0.07792881, 0.21319405], [ 0.20925777, 0.74999169]])]
As you might expect, we can also use the results in further parallel calls, as follows:
In [24]: async_result = view.apply(np.linalg.eigvals, values) async_result Out[24]: <AsyncResult: eigvals> In [25]: async_result.get() Out[25]: [array([[ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029]]), array([[ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029]]), array([[ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029]]), array([[ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029], [ 0.01706021, 0.81086029]])]
In contrast to the direct view, IPython offers a view that does not bypass the schedulers. Instead, this view executes based on the configured load-balancing scheme.
There are a variety of valid ways to determine where the jobs should be assigned in a load-balancing situation. IPython supports several standard schemes and even provides the means by which developers can easily add their own. The scheme can be selected either via the scheme
argument to ipcontroller
, or in the TaskScheduler.schemename
attribute of a controller config
object.
The following built-in routing schemes are provided by IPython:
lru
: Least recently usedplainrandom
: Plain randomtwobin
: Two-bin randomleastload
: Least load (default)weighted
: Weighted two-bin randomTo select one of the aforementioned schemes, simply use the ipcontroller
command-line tool, as follows:
$ ipcontroller –scheme=twobin
Call the client with the appropriate method to get the load-balanced view, as follows:
In [26]: lb_view = client.load_balanced_view() lb_view Out[26]: <LoadBalancedView None>
The basic usage is the same as the direct view:
In [27]: serial_result = map(lambda x:x**10, range(32)) parallel_result = lb_view.map(lambda x:x**10, range(32)) In [28]: list(serial_result) == parallel_result.get() Out[28]: True
The load-balanced view provides a convenient decorator to create parallel functions. In form, the parallel functions look just like their serially executed cousins. However, with @parallel
, the function obtains a map
method, which will distribute the execution of the function across the cluster, as follows:
In [29]: @lb_view.parallel() def f(x): return 10.0*x**4 f.map(range(32)).get() Out[29]: [0.0, 10.0, 160.0, 810.0, 2560.0, 6250.0, 12960.0, 24010.0, 40960.0, 65610.0, 100000.0, 146410.0, 207360.0, 285610.0, 384160.0, 506250.0, 655360.0, 835210.0, 1049760.0, 1303210.0, 1600000.0, 1944810.0, 2342560.0, 2798410.0, 3317760.0, 3906250.0, 4569760.0, 5314410.0, 6146560.0, 7072810.0, 8100000.0, 9235210.0]
If you are unfamiliar with IPython, then you may not know that IPython has a set of predefined functions that are referred to as the magic functions or simply magics. Some apply these functions only to a single line (the ones with the %
prefix). However, some apply them to the entire cell (these have the %%
prefix).
IPython comes with magics, which ease the user experience of executing code in parallel. If your parallel code requires libraries, you can use the following code to import them to all the engines:
In [30]: with view.sync_imports(): import numpy importing numpy on engine(s)
Now, let's execute the code that we used in the section on the direct view, as follows:
In [31]: %px async_result = numpy.random.rand(2, 2) In [32]: %px numpy.linalg.eigvals(async_result) Out[0:2]: array([ 1.69123631, 0.0052597 ]) Out[1:2]: array([ 1.4345667 , 0.15208336]) Out[2:2]: array([ 1.24709664, -0.06577105]) Out[3:2]: array([ 0.39707627, 1.01065811])
Let's use the clustering features of IPython to execute the same job that we did during the implementation of the ZeroMQ pipeline pattern. Go ahead and stop the cluster with four nodes and restart it with eight nodes, as follows:
$ ipcluster start -n 8
Let's get a fresh connection that is aware of the new nodes, as follows:
In [33]: client = Client() client.ids Out[33]: [0, 1, 2, 3, 4, 5, 6, 7]
Next, we're going to do something different—we'll demonstrate working with blocking, synchronous results by explicitly setting a flag. Also, since the estimate_pi
function uses an imported module, we're going to have each engine import it, as follows:
In [34]: view = client.direct_view() view.block = True with view.sync_imports(): import random importing random on engine(s)
Now, let's execute the π-estimating function in the new IPython cluster, timing it with the %%time
magic function, as follows:
In [35]: %%time node_count = 8 results = view.apply(estimate_pi, 1e8 / node_count) pi_est = sum(results)/len(client.ids) print("Result: {}".format(pi_est)) Result: 3.1414122399999997 CPU times: user 6.76 s, sys: 624 ms, total: 7.38 s Wall time: 19.1 s
The preceding code runs faster than our custom multiprocessing ZeroMQ example. It is almost four times faster than the original serial example that we gave, but all things being equal, its biggest advantage is that it's much easier to set up. However, thanks to the work we did in the previous section, we have an idea of what's going on behind the scenes due to the fact that IPython uses ZeroMQ to create and manage its clusters.
In addition to it being easy to set up, it's easier to plot the results. This is demonstrated in the following code:
In [36]: (figure, axes) = plt.subplots(figsize=(16, 8)) axes.scatter([x + 1 for x in range(len(results))], results, color=colors[0]) axes.set_title("Estimated values of $pi$", fontsize=28) axes.set_ylabel("~$pi$", fontsize=24) axes.set_xlabel("Worker Number", fontsize=20) plt.show()
The following plot is the result of the preceding code: