Clustering with IPython

As explained in the IPython documentation for parallel computing, IPython has built-in support for parallelism. This came as a result of the architectural overhaul that IPython received when the project finished migrating to ZeroMQ in 2011. The architecture that resulted can be summarized with the following components, all of which are present in the IPython.parallel package:

  • The IPython engine: This is a Python interpreter that accepts Python commands over a network connection. Multiple engines form the basis of IPython's parallel computing capabilities.
  • The IPython hub: This is the process that keeps track of engine connections, schedulers, clients, task requests, and results. Its primary purpose is to facilitate queries that are made from the cluster state.
  • The IPython schedulers: The actions that can be performed on an engine go through a scheduler. They also provide a fully asynchronous interface to a set of engines.
  • The controller client: This is the user interface for developers who wish to access a set of engines. It is what we will be using subsequently in the code examples (in particular, the different views).

The following figure shows us the IPython architecture:

Clustering with IPython

Thanks to the aforementioned architecture, IPython supports the following different styles of parallelism:

  • Single program, multiple data
  • Multiple programs, multiple data
  • Passing messages by using Message Passing Interface (MPI)
  • Task farming
  • Parallel data
  • Combinations of the aforementioned approaches
  • Custom user-defined approaches

Practically speaking, this allows the IPython users to tackle the following use cases:

  • Quickly parallelize algorithms that are embarrassingly parallel by using a number of simple approaches. Many simple things can be parallelized interactively in one or two lines of code.
  • Steer the traditional MPI applications on a supercomputer from an IPython session on your laptop.
  • Analyze and visualize large datasets (that may be remote and/or distributed) interactively by using IPython and tools such as matplotlib or TVTK.
  • Develop, test, and debug new parallel algorithms (that may use MPI) interactively.
  • Tie together the multiple MPI jobs that run on different systems into a giant distributed and parallel system.
  • Start a parallel job on your cluster and then have a remote collaborator connect to it and pull back data into their local IPython session for plotting and analysis.
  • Run a set of tasks on a set of CPUs by using dynamic load balancing.

Getting started

To use IPython for parallel computing, you need to initialize an instance of the controller and one or more instances of the engine. Initially, it is best to simply start a controller and engines on a single host by using the ipcluster command. To initialize a controller and 4 engines on your localhost, switch to a terminal window with this notebook's virtual environment activated and execute the following command:

$ ipcluster start -n 4

This will run the ipcluster app in the foreground and show the following output:

2015-04-28 09:50:29.584 [IPClusterStart] Using existing profile dir: '/Users/oubiwann/.ipython/profile_default'
2015-04-28 09:50:29.587 [IPClusterStart] Starting ipcluster with [daemon=False]
2015-04-28 09:50:29.589 [IPClusterStart] Creating pid file: /Users/oubiwann/.ipython/profile_default/pid/ipcluster.pid
2015-04-28 09:50:29.589 [IPClusterStart] Starting Controller with LocalControllerLauncher
2015-04-28 09:50:30.593 [IPClusterStart] Starting 4 Engines with LocalEngineSetLauncher

With an IPython cluster running, we're ready to start using it. There are several ways via which one can interact with an IPython cluster. The connection is made with a client object. However, the client object offers views for an actual interaction with the cluster. The following views are available:

  • The direct view
  • The load-balanced view
  • The IPython parallel magic functions

Though not a view per se, the IPython cluster provides magic functions to interact with clusters, thus acting very much like a view. In an IPython Notebook, the parallel magic functions will often be what you want to use. For both the direct and load-balanced views, you will need to create a cluster client, as follows:

In [20]: from IPython.parallel import Client
         client = Client() 
         client.ids 

The client.ids attribute holds the IDs for each IPython cluster engine that was started by the call to the preceding ipcluster.

The direct view

The direct view is called so because the DirectView object offers an interface to the cluster that doesn't go through the schedulers. Instead, it offers direct, manual execution control.

Here's how you create a direct view from the client object:

In [21]: view = client.direct_view()
         view
Out[21]: <DirectView all>

Direct views are also available on the client via indices. The all part in the view object representation is an oblique reference to the fact that you have not selected some of the direct view instances on the object (such as the view index number or slicing), but have rather asked for all of them.

By default, when executing functions in parallel, an asynchronous result object is returned, which is demonstrated in the following code:

In [22]: async_result = view.apply(np.random.rand, 2, 2)
         async_result
Out[22]: <AsyncResult: finished>

When the results are ready to be obtained, the result object representation will provide a clue to this fact by displaying finished. To get the result values, simply call the get method, as follows:

In [23]: values = async_result.get()
         values
Out[23]: [array([[ 0.07792881,  0.21319405],
                 [ 0.20925777,  0.74999169]]),
          array([[ 0.07792881, 0.21319405],
                 [ 0.20925777,  0.74999169]]),
          array([[ 0.07792881,  0.21319405],
                 [ 0.20925777,  0.74999169]]),
          array([[ 0.07792881,  0.21319405],
                 [ 0.20925777,  0.74999169]])]

As you might expect, we can also use the results in further parallel calls, as follows:

In [24]: async_result = view.apply(np.linalg.eigvals, values)
         async_result
Out[24]: <AsyncResult: eigvals>
In [25]: async_result.get()
Out[25]: [array([[ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029]]),
          array([[ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029]]),
          array([[ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029]]),
          array([[ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029],
                 [ 0.01706021,  0.81086029]])]

The load-balanced view

In contrast to the direct view, IPython offers a view that does not bypass the schedulers. Instead, this view executes based on the configured load-balancing scheme.

There are a variety of valid ways to determine where the jobs should be assigned in a load-balancing situation. IPython supports several standard schemes and even provides the means by which developers can easily add their own. The scheme can be selected either via the scheme argument to ipcontroller, or in the TaskScheduler.schemename attribute of a controller config object.

The following built-in routing schemes are provided by IPython:

  • lru: Least recently used
  • plainrandom: Plain random
  • twobin: Two-bin random
  • leastload: Least load (default)
  • weighted: Weighted two-bin random

To select one of the aforementioned schemes, simply use the ipcontroller command-line tool, as follows:

$ ipcontroller –scheme=twobin

Call the client with the appropriate method to get the load-balanced view, as follows:

In [26]: lb_view = client.load_balanced_view()
         lb_view
Out[26]: <LoadBalancedView None>

The basic usage is the same as the direct view:

In [27]: serial_result = map(lambda x:x**10, range(32))
         parallel_result = lb_view.map(lambda x:x**10, range(32))
In [28]: list(serial_result) == parallel_result.get()
Out[28]: True

The load-balanced view provides a convenient decorator to create parallel functions. In form, the parallel functions look just like their serially executed cousins. However, with @parallel, the function obtains a map method, which will distribute the execution of the function across the cluster, as follows:

In [29]: @lb_view.parallel()
         def f(x):
             return 10.0*x**4

         f.map(range(32)).get()
Out[29]: [0.0,
          10.0,
          160.0,
          810.0,
          2560.0,
          6250.0,
          12960.0,
          24010.0,
          40960.0,
          65610.0,
          100000.0,
          146410.0,
          207360.0,
          285610.0,
          384160.0,
          506250.0,
          655360.0,
          835210.0,
          1049760.0,
          1303210.0,
          1600000.0,
          1944810.0,
          2342560.0,
          2798410.0,
          3317760.0,
          3906250.0,
          4569760.0,
          5314410.0,
          6146560.0,
          7072810.0,
          8100000.0,
          9235210.0]

The parallel magic functions

If you are unfamiliar with IPython, then you may not know that IPython has a set of predefined functions that are referred to as the magic functions or simply magics. Some apply these functions only to a single line (the ones with the % prefix). However, some apply them to the entire cell (these have the %% prefix).

IPython comes with magics, which ease the user experience of executing code in parallel. If your parallel code requires libraries, you can use the following code to import them to all the engines:

In [30]: with view.sync_imports():
             import numpy
importing numpy on engine(s)

Now, let's execute the code that we used in the section on the direct view, as follows:

In [31]: %px async_result = numpy.random.rand(2, 2)
In [32]: %px numpy.linalg.eigvals(async_result)
         Out[0:2]: array([ 1.69123631,  0.0052597 ])
         Out[1:2]: array([ 1.4345667 ,  0.15208336])
         Out[2:2]: array([ 1.24709664, -0.06577105])
         Out[3:2]: array([ 0.39707627,  1.01065811])

An example – estimating the value of π

Let's use the clustering features of IPython to execute the same job that we did during the implementation of the ZeroMQ pipeline pattern. Go ahead and stop the cluster with four nodes and restart it with eight nodes, as follows:

$ ipcluster start -n 8

Let's get a fresh connection that is aware of the new nodes, as follows:

In [33]: client = Client() 
         client.ids
Out[33]: [0, 1, 2, 3, 4, 5, 6, 7]

Next, we're going to do something different—we'll demonstrate working with blocking, synchronous results by explicitly setting a flag. Also, since the estimate_pi function uses an imported module, we're going to have each engine import it, as follows:

In [34]: view = client.direct_view()
         view.block = True
         with view.sync_imports():
             import random
         importing random on engine(s)

Now, let's execute the π-estimating function in the new IPython cluster, timing it with the %%time magic function, as follows:

In [35]: %%time
         node_count = 8
         results = view.apply(estimate_pi, 1e8 / node_count)
         pi_est = sum(results)/len(client.ids)
         print("Result: {}".format(pi_est))
         Result: 3.1414122399999997
         CPU times: user 6.76 s, sys: 624 ms, total: 7.38 s
         Wall time: 19.1 s

The preceding code runs faster than our custom multiprocessing ZeroMQ example. It is almost four times faster than the original serial example that we gave, but all things being equal, its biggest advantage is that it's much easier to set up. However, thanks to the work we did in the previous section, we have an idea of what's going on behind the scenes due to the fact that IPython uses ZeroMQ to create and manage its clusters.

In addition to it being easy to set up, it's easier to plot the results. This is demonstrated in the following code:

In [36]: (figure, axes) = plt.subplots(figsize=(16, 8))
         axes.scatter([x + 1 for x in range(len(results))],
                      results, color=colors[0])
         axes.set_title("Estimated values of $pi$", fontsize=28)
         axes.set_ylabel("~$pi$", fontsize=24)
         axes.set_xlabel("Worker Number", fontsize=20)
         plt.show()

The following plot is the result of the preceding code:

An example – estimating the value of π
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset