The multiprocessing API was originally designed to mimic the thread API. However, it has evolved and in recent versions of Python 3, it supports more features more robustly. The multiprocessing library is designed when CPU-intensive jobs need to happen in parallel and multiple cores are available (given that a four core Raspberry Pi can currently be purchased for $35, there are usually multiple cores available). Multiprocessing is not useful when the processes spend a majority of their time waiting on I/O (for example, network, disk, database, or keyboard), but they are the way to go for parallel computation.
The multiprocessing module spins up new operating system processes to do the work. On Windows machines, this is a relatively expensive operation; on Linux, processes are implemented in the kernel the same way threads are, so the overhead is limited to the cost of running separate Python interpreters in each process.
Let's try to parallelize a compute-heavy operation using similar constructs to those provided by the threading
API:
from multiprocessing import Process, cpu_count import time import os class MuchCPU(Process): def run(self): print(os.getpid()) for i in range(200000000): pass if __name__ == '__main__': procs = [MuchCPU() for f in range(cpu_count())] t = time.time() for p in procs: p.start() for p in procs: p.join() print('work took {} seconds'.format(time.time() - t))
This example just ties up the CPU for 200 million iterations. You may not consider this to be useful work, but it's a cold day and I appreciate the heat my laptop generates under such load.
The API should be familiar; we implement a subclass of Process
(instead of Thread
) and implement a run
method. This method prints out the process ID (a unique number the operating system assigns to each process on the machine) before doing some intense (if misguided) work.
Pay special attention to the if __name__ == '__main__':
guard around the module level code that prevents it to run if the module is being imported, rather than run as a program. This is good practice in general, but when using multiprocessing on some operating systems, it is essential. Behind the scenes, multiprocessing may have to import the module inside the new process in order to execute the run()
method. If we allowed the entire module to execute at that point, it would start creating new processes recursively until the operating system ran out of resources.
We construct one process for each processor core on our machine, then start and join each of those processes. On my 2014 era quad-core laptop, the output looks like this:
6987 6988 6989 6990 work took 12.96659541130066 seconds
The first four lines are the process ID that was printed inside each MuchCPU
instance. The last line shows that the 200 million iterations can run in about 13 seconds on my machine. During that 13 seconds, my process monitor indicated that all four of my cores were running at 100 percent.
If we subclass threading.Thread
instead of multiprocessing.Process
in MuchCPU
, the output looks like this:
7235 7235 7235 7235 work took 28.577413082122803 seconds
This time, the four threads are running inside the same process and take close to three times as long to run. This is the cost of the global interpreter lock; in other languages or implementations of Python, the threaded version would run at least as fast as the multiprocessing version, We might expect it to be four times as long, but remember that many other programs are running on my laptop. In the multiprocessing version, these programs also need a share of the four CPUs. In the threading version, those programs can use the other three CPUs instead.
In general, there is no reason to have more processes than there are processors on the computer. There are a few reasons for this:
cpu_count()
processes can run simultaneouslyGiven these constraints, it makes sense to create at most cpu_count()
processes when the program starts and then have them execute tasks as needed. It is not difficult to implement a basic series of communicating processes that does this, but it can be tricky to debug, test, and get right. Of course, Python being Python, we don't have to do all this work because the Python developers have already done it for us in the form of multiprocessing pools.
The primary advantage of pools is that they abstract away the overhead of figuring out what code is executing in the main process and which code is running in the subprocess. As with the threading API that multiprocessing mimics, it can often be hard to remember who is executing what. The pool abstraction restricts the number of places that code in different processes interact with each other, making it much easier to keep track of.
All this pickling and passing data into pipes takes time and memory. Therefore, it is ideal to keep the amount and size of data passed into and returned from the pool to a minimum, and it is only advantageous to use the pool if a lot of processing has to be done on the data in question.
Armed with this knowledge, the code to make all this machinery work is surprisingly simple. Let's look at the problem of calculating all the prime factors of a list of random numbers. This is a common and expensive part of a variety of cryptography algorithms (not to mention attacks on those algorithms!). It requires years of processing power to crack the extremely large numbers used to secure your bank accounts. The following implementation, while readable, is not at all efficient, but that's ok because we want to see it using lots of CPU time:
import random from multiprocessing.pool import Pool def prime_factor(value): factors = [] for divisor in range(2, value-1): quotient, remainder = divmod(value, divisor) if not remainder: factors.extend(prime_factor(divisor)) factors.extend(prime_factor(quotient)) break else: factors = [value] return factors if __name__ == '__main__': pool = Pool() to_factor = [ random.randint(100000, 50000000) for i in range(20) ] results = pool.map(prime_factor, to_factor) for value, factors in zip(to_factor, results): print("The factors of {} are {}".format(value, factors))
Let's focus on the parallel processing aspects as the brute force recursive algorithm for calculating factors is pretty clear. We first construct a multiprocessing pool instance. By default, this pool creates a separate process for each of the CPU cores in the machine it is running on.
The map
method accepts a function and an iterable. The pool pickles each of the values in the iterable and passes it into an available process, which executes the function on it. When that process is finished doing it's work, it pickles the resulting list of factors and passes it back to the pool. Once all the pools are finished processing work (which could take some time), the results list is passed back to the original process, which has been waiting patiently for all this work to complete.
It is often more useful to use the similar map_async
method, which returns immediately even though the processes are still working. In that case, the results variable would not be a list of values, but a promise to return a list of values later by calling results.get()
. This promise object also has methods like ready()
, and wait()
, which allow us to check whether all the results are in yet.
Alternatively, if we don't know all the values we want to get results for in advance, we can use the apply_async
method to queue up a single job. If the pool has a process that isn't already working, it will start immediately; otherwise, it will hold onto the task until there is a free process available.
Pools can also be close
d, which refuses to take any further tasks, but processes everything currently in the queue, or terminate
d, which goes one step further and refuses to start any jobs still on the queue, although any jobs currently running are still permitted to complete.
If we need more control over communication between processes, we can use a Queue
. Queue
data structures are useful for sending messages from one process into one or more other processes. Any picklable object can be sent into a Queue
, but remember that pickling can be a costly operation, so keep such objects small. To illustrate queues, let's build a little search engine for text content that stores all relevant entries in memory.
This is not the most sensible way to build a text-based search engine, but I have used this pattern to query numerical data that needed to use CPU-intensive processes to construct a chart that was then rendered to the user.
This particular search engine scans all files in the current directory in parallel. A process is constructed for each core on the CPU. Each of these is instructed to load some of the files into memory. Let's look at the function that does the loading and searching:
def search(paths, query_q, results_q): lines = [] for path in paths: lines.extend(l.strip() for l in path.open()) query = query_q.get() while query: results_q.put([l for l in lines if query in l]) query = query_q.get()
Remember, this function is run in a different process (in fact, it is run in cpucount()
different processes) from the main thread. It is passes a list of path.path
objects and two multiprocessing.Queue
objects; one for incoming queries and one to send outgoing results. These queues have a similar interface to the Queue
class we discussed in Chapter 6, Python Data Structures. However, they are doing extra work to pickle the data in the queue and pass it into the subprocess over a pipe. These two queues are set up in the main process and passed through the pipes into the search function inside the child processes.
The search code is pretty dumb, both in terms of efficiency and of capabilities; it loops over every line stored in memory and puts the matching ones in a list. The list is placed on a queue and passed back to the main process.
Let's look at the main process, which sets up these queues:
if __name__ == '__main__': from multiprocessing import Process, Queue, cpu_count from path import path cpus = cpu_count() pathnames = [f for f in path('.').listdir() if f.isfile()] paths = [pathnames[i::cpus] for i in range(cpus)] query_queues = [Queue() for p in range(cpus)] results_queue = Queue() search_procs = [ Process(target=search, args=(p, q, results_queue)) for p, q in zip(paths, query_queues) ] for proc in search_procs: proc.start()
For easier description, let's assume cpu_count
is four. Notice how the import statements are placed inside the if
guard? This is a small optimization that prevents them from being imported in each subprocess (where they aren't needed) on certain operating systems. We list all the paths in the current directory and then split the list into four approximately equal parts. We also construct a list of four Queue
objects to send data into each subprocess. Finally, we construct a single
results queue; this is passed into all four of the subprocesses. Each of them can put data into the queue and it will be aggregated in the main process.
Now let's look at the code that makes a search actually happen:
for q in query_queues: q.put("def") q.put(None) # Signal process termination for i in range(cpus): for match in results_queue.get(): print(match) for proc in search_procs: proc.join()
This code performs a single search for "def"
(because it's a common phrase in a directory full of Python files!). In a more production ready system, we would probably hook a socket up to this search code. In that case, we'd have to change the inter-process protocol so that the message coming back on the return queue contained enough information to identify which of many queries the results were attached to.
This use of queues is actually a local version of what could become a distributed system. Imagine if the searches were being sent out to multiple computers and then recombined. We won't discuss it here, but the multiprocessing module includes a manager class that can take a lot of the boilerplate out of the preceding code. There is even a version of the multiprocessing.Manager
that can manage subprocesses on remote systems to construct a rudimentary distributed application. Check the Python multiprocessing documentation if you are interested in pursuing this further.
As threads do, multiprocessing also has problems, some of which we have already discussed. There is no best way to do concurrency; this is especially true in Python. We always need to examine the parallel problem to figure out which of the many available solutions is the best one for that problem. Sometimes, there is no best solution.
In the case of multiprocessing, the primary drawback is that sharing data between processes is very costly. As we have discussed, all communication between processes, whether by queues, pipes, or a more implicit mechanism requires pickling the objects. Excessive pickling quickly dominates processing time. Multiprocessing works best when relatively small objects are passed between processes and a tremendous amount of work needs to be done on each one. On the other hand, if no communication between processes is required, there may not be any point in using the module at all; we can spin up four separate Python processes and use them independently.
The other major problem with multiprocessing is that, like threads, it can be hard to tell which process a variable or method is being accessed in. In multiprocessing, if you access a variable from another process it will usually overwrite the variable in the currently running process while the other process keeps the old value. This is really confusing to maintain, so don't do it.