Chapter 9. Processes, Threads, and Synchronization

Introduction

Credit: Greg Wilson, Third Bit

Thirty years ago, in his classic The Mythical Man-Month: Essays on Software Engineering (Addison-Wesley), Fred Brooks drew a distinction between accidental and intrinsic complexity. Languages such as English and C++, with their inconsistent rules, exceptions, and special cases, are examples of the former: they make communication and programming harder than they need to be. Concurrency, on the other hand, is a prime example of the latter. Most people have to struggle to keep one chain of events straight in their minds; keeping track of two, three, or a dozen, plus all of their possible interactions, is just plain hard.

Computer scientists began studying ways of running multiple processes safely and efficiently in a single physical address space in the mid-1960s. Since then, a rich theory has been developed in which assertions about the behavior of interacting processes can be formalized and proved, and entire languages devoted to concurrent and parallel programming have been created. Foundations of Multithreaded, Parallel, and Distributed Programming, by Gregory R. Andrews (Addison-Wesley), is not only an excellent introduction to this theory, but also contains a great deal of historical information tracing the development of major ideas.

Over the past 20 years, opportunity and necessity have conspired to make concurrency a part of programmers’ everyday lives. The opportunity is for greater speed, which comes from the growing availability of multiprocessor machines. In the early 1980s, these were expensive curiosities; today, many programmers have dual-processor workstations on their desks and four-way or eight-way servers in the back room. If a calculation can be broken down into independent (or nearly independent) pieces, such machines can potentially solve them two, four, or eight times faster than their uniprocessor equivalents. While the potential gains from this approach are limited, it works well for problems as diverse as image processing, serving HTTP requests, and recompiling multiple source files.

The necessity for concurrent programming comes from GUIs and network applications. Graphical interfaces often need to appear to be doing several things at once, such as displaying images while scrolling ads across the bottom of the screen. While it is possible to do the necessary interleaving manually, it is much simpler to code each operation on its own and let the underlying operating system decide on a concrete order of operations. Similarly, network applications often have to listen on several sockets at once or send data on one channel while receiving data on another.

Broadly speaking, operating systems give programmers two kinds of concurrency. Processes run in separate logical address spaces that are protected from each other. Using concurrent processing for performance purposes, particularly in multiprocessor machines, is more attractive with threads, which execute simultaneously within the same program, in the same address space, without being protected from each other. The lack of mutual protection allows lower overhead and easier and faster communication, particularly because of the shared address space. Since all threads run code from the same program, no special security risks are caused by the lack of mutual protection, any more than the risks in a single-threaded program. Thus, concurrency used for performance purposes is most often focused on adding threads to a single program.

However, adding threads to a Python program to speed it up is often not a successful strategy. The reason is the Global Interpreter Lock (GIL), which protects Python’s internal data structures. This lock must be held by a thread before the thread can safely access Python objects. Without the lock, even simple operations (such as incrementing an integer) could fail. Therefore, only the thread with the GIL can manipulate Python objects or call Python/C API functions.

To make life easier for programmers, the interpreter releases and reacquires the lock every 100 bytecode instructions (a value that can be changed using sys.setcheckinterval). The lock is also released and reacquired around I/O operations, such as reading or writing a file, so that other threads can run while the thread that requests the I/O is waiting for the I/O operation to complete. However, effective performance-boosting exploitation of multiple processors from multiple pure-Python threads of the same process is just not in the cards. Unless the CPU performance bottlenecks in your Python application are in C-coded extensions that release the GIL, you will not observe substantial performance increases by moving your multithreaded application to a multiprocessor machine.

However, threading is not just about performance on multiprocessor machines. A GUI can’t know when the user will press a key or move the mouse, and an HTTP server can’t know which datagram will arrive next. Handling each stream of events with a separate control thread is therefore often the simplest way to cope with this unpredictability, even on single-processor machines, and when high throughput is not an overriding concern. Of course, event-driven programming can often be used in these kinds of applications as well, and Python frameworks such as asyncore and Twisted are proof that this approach can often deliver excellent performance with complexity that, while different from that inherent in multithreading, is not necessarily any more difficult to deal with.

The standard Python library allows programmers to approach multithreaded programming at two different levels. The core module, thread, is a thin wrapper around the basic primitives that any threading library must provide. Three of these primitives are used to create, identify, and end threads; others are used to create, test, acquire, and release simple mutual-exclusion locks (or binary semaphores). As the recipes in this section demonstrate, programmers should avoid using these primitives directly, and should instead use the tools included in the higher-level threading module, which is substantially more programmer-friendly and has similar performance characteristics.

Whether you use thread or threading, some underlying aspects of Python’s threading model stay the same. The GIL, in particular, works just the same either way. The crucial advantage of the GIL is that it makes it much easier to code Python extensions in C: unless your C extension explicitly releases the GIL, you know thread switches won’t happen until your C code calls back into Python code. This advantage can be really important when your extension makes available to Python some underlying C library that isn’t thread-safe. If your C code is thread-safe, though, you can and should release the GIL around stretches of computational or I/O operations that can last for a substantial time without needing to make Python C API calls; when you do this, you make it possible for Python programs using your C extension to take advantage of more than one processor from multiple threads within the same process. Make sure you acquire the GIL again before calling any Python C API entry point, though!

Any time your code wants to access a data structure that is shared among threads, you may have to wonder whether a given operation is atomic, meaning that no thread switch can happen during the operation. In general, anything with multiple bytecodes is not atomic, since a thread switch might always happen between one bytecode and the next (you can use the standard library function dis.dis to disassemble Python code into bytecodes). Moreover, even a single bytecode is not atomic, if it can call back to arbitrary Python code (e.g., because that bytecode can end up executing a Python-coded special method). When in doubt, it is most prudent to assume that whatever is giving you doubts is not atomic: so, reduce to the bare minimum the data structures accessed by more than one thread (except for instances of Queue.Queue, a class that is specifically designed to be thread-safe!), and make sure you protect with locks any access to any such structures that remain.

Almost invariably, the proper idiom to use some lock is:

somelock.acquire( )
try:
   #operations needing the lock (keep to a minimum!)
finally:
    somelock.release( )

The try/finally construct ensures the lock will be released even if some exception happens in the code in the try clause. Accidentally failing to release a lock, due to some unforeseen exception, could soon make all of your application come to a grinding halt. Also, be careful acquiring more than one lock in sequence; if you really truly need to do such multiple acquisitions, make sure all possible paths through the code acquire the various locks in the same sequence. Otherwise, you’re likely sooner or later to enter the disaster case in which two threads are each trying to acquire a lock held by the other—a situation known as deadlock, which does mean that your program is as good as dead.

The most important elements of the threading module are classes that represent threads and various high-level synchronization constructs. The Thread class represents a separate control thread; it can be told what to do by passing a callable object to its constructor, or, alternatively, by overriding its run method. One thread can start another by calling its start method, and wait for it to complete by calling join. Python also supports daemon threads, which do background processing until all of the nondaemon threads in the program exit and then shut themselves down automatically.

The synchronization constructs in the threading module include locks, reentrant locks (which a single thread can safely relock many times without deadlocking), counting semaphores, conditions, and events. Events can be used by one thread to signal others that something interesting has happened (e.g., that a new item has been added to a queue, or that it is now safe for the next thread to modify a shared data structure). The documentation that comes with Python, specifically the Library Reference manual, describes each of these classes in detail.

The relatively low number of recipes in this chapter, compared to some other chapters in this cookbook, reflects both Python’s focus on programmer productivity (rather than absolute performance) and the degree to which other packages (such as httplib and wxPython) hide the unpleasant details of concurrency in important application areas. This relative scarcity also reflects many Python programmers’ tendencies to look for the simplest way to solve any particular problem, which complex threading rarely is.

However, this chapter’s brevity may also reflect the Python community’s underappreciation of the potential of simple threading, when used appropriately, to simplify a programmer’s life. The Queue module in particular supplies a delightfully self-contained (and yet extensible and customizable!) synchronization and cooperation structure that can provide all the interthread supervision services you need. Consider a typical program, which accepts requests from a GUI (or from the network). As a “result” of such requests, the program will often find itself faced with the prospect of having to perform a substantial chunk of work. That chunk might take so long to perform all at once that, unless some precautions are taken, the program would appear unresponsive to the GUI (or network).

In a purely event-driven architecture, it may take considerable effort on the programmer’s part to slice up such a hefty work-chunk into slices of work thin enough that each slice can be performed in idle time, without ever giving the appearance of unresponsiveness. In cases such as this one, just a dash of multithreading can help considerably. The main thread pushes a work request describing the substantial chunk of background work onto a dedicated Queue instance, then goes back to its task of making the program’s interface responsive at all times.

At the other end of the Queue, a pool of daemonic worker threads await, each ready to peel a work request off the Queue and run it straight through. This kind of overall architecture combines event-driven and multithreaded approaches in the overarching ideal of simplicity and is thus maximally Pythonic. You may need just a little bit more work if the result of a worker thread’s efforts must be presented again to the main thread (via another Queue, of course), which is normally the case with GUIs. If you’re willing to cheat just a little, and use polling for the mostly event-driven main thread to access the result Queue back from the daemonic worker threads. See Recipe 11.9, to get an idea of how simple that little bit of work can be.

9.1. Synchronizing All Methods in an Object

Credit: André Bjärb, Alex Martelli, Radovan Chytracek

Problem

You want to share an object among multiple threads, but, to avoid conflicts, you need to ensure that only one thread at a time is inside the object—possibly excepting some methods for which you want to hand-tune locking behavior.

Solution

Java offers such synchronization as a built-in feature, while in Python you have to program it explicitly by wrapping the object and its methods. Wrapping is so general and useful that it deserves to be factored out into general tools:

def wrap_callable(any_callable, before, after):
    ''' wrap any callable with before/after calls '''
    def _wrapped(*a, **kw):
        before( )
        try:
            return any_callable(*a, **kw)
        finally:
            after( )
    # In 2.4, only: _wrapped._ _name_ _ = any_callable._ _name_ _
    return _wrapped
import inspect
class GenericWrapper(object):
    ''' wrap all of an object's methods with before/after calls '''
    def _ _init_ _(self, obj, before, after, ignore=( )):
        # we must set into _ _dict_ _ directly to bypass _ _setattr_ _; so,
        # we need to reproduce the name-mangling for double-underscores
        clasname = 'GenericWrapper'
        self._ _dict_ _['_%s_ _methods' % clasname] = {  }
        self._ _dict_ _['_%s_ _obj' % clasname] = obj
        for name, method in inspect.getmembers(obj, inspect.ismethod):
            if name not in ignore and method not in ignore:
                self._ _methods[name] = wrap_callable(method, before, after)
    def _ _getattr_ _(self, name):
        try:
            return self._ _methods[name]
        except KeyError:
            return getattr(self._ _obj, name)
    def _ _setattr_ _(self, name, value):
        setattr(self._ _obj, name, value)

Using these simple but general tools, synchronization becomes easy:

class SynchronizedObject(GenericWrapper):
    ''' wrap an object and all of its methods with synchronization '''
    def _ _init_ _(self, obj, ignore=( ), lock=None):
        if lock is None:
            import threading
            lock = threading.RLock( )
        GenericWrapper._ _init_ _(self, obj, lock.acquire, lock.release, ignore)

Discussion

As per usual Python practice, we can complete this module with a small self-test, executed only when the module is run as main script. This snippet also serves to show how the module’s functionality can be used:

if _ _name_ _ == '_ _main_ _':
    import threading
    import time
    class Dummy(object):
        def foo(self):
            print 'hello from foo'
            time.sleep(1)
        def bar(self):
            print 'hello from bar'
        def baaz(self):
            print 'hello from baaz'
    tw = SynchronizedObject(Dummy( ), ignore=['baaz'])
    threading.Thread(target=tw.foo).start( )
    time.sleep(0.1)
    threading.Thread(target=tw.bar).start( )
    time.sleep(0.1)
    threading.Thread(target=tw.baaz).start( )

Thanks to the synchronization, the call to bar runs only when the call to foo has completed. However, because of the ignore= keyword argument, the call to baaz bypasses synchronization and thus completes earlier. So the output is:

hello from foo
hello from baaz
hello from bar

When you find yourself using the same single-lock locking code in almost every method of an object, use this recipe to refactor the locking away from the object’s application-specific logic. The key effect you get by applying this recipe is to effectively replace each method with:

self.lock.acquire( )
try:  # The "real" application code for the method
finally:
    self.lock.release( )

This code idiom is, of course, the right way to express locking: the try/finally statement ensures that the lock gets released in any circumstance, whether the application code terminates correctly or raises an exception. You’ll note that factory wrap_callable returns a closure, which is carefully coded in exactly this way!

To some extent, this recipe can also be handy when you want to postpone worrying about a class’ locking behavior. However, if you intend to use this code for production purposes, you should understand all of it. In particular, this recipe does not wrap direct accesses (for getting or setting) to the object’s attributes. If you want such direct accesses to respect the object’s lock, you need to add the try/finally locking idiom to the wrapper’s _ _getattr_ _ and _ _setattr_ _ special methods, around the calls these methods make to the getattr and setattr built-in functions, respectively. I normally don’t find that depth of wrapping to be necessary in my applications. (The way I code, wrapping just the methods proves sufficient.)

If you’re into custom metaclasses, you may be surprised that I do not offer a metaclass for these synchronization purposes. However, wrapping is a more dynamic and flexible approach—for example, an object can exist in both wrapped (synchronized) and unwrapped (raw) incarnations, and you can use the most appropriate one case by case. You pay for wrapping’s flexibility with a little bit more runtime overhead at each method call, but compared to the large costs of acquiring and releasing locks I don’t think this tiny extra overhead matters. Meanwhile, this recipe shows off, and effectively reuses, a wrapper-closure factory and a wrapper class that demonstrate how easy Python makes it to implement that favorite design pattern of Aspect-Oriented Programming’s fans, the insertion of “before-and-after” calls around every call to an object’s methods.

See Also

Documentation of the standard library modules threading and inspect in the Library Reference and Python in a Nutshell.

9.2. Terminating a Thread

Credit: Doug Fort

Problem

You must terminate a thread from the outside, but Python doesn’t let one thread just brutally kill another, so you need to use a suitable controlled-termination idiom.

Solution

A frequently asked question is: How do I kill a thread? The answer is: You don’t. Instead, you kindly ask it to go away. Each thread must periodically check whether it’s been asked to go away and then comply (typically after some kind of cleanup). Here is an example:

import threading
class TestThread(threading.Thread):
    def _ _init_ _(self, name='TestThread'):
        """ constructor, setting initial variables """
        self._stopevent = threading.Event( )
        self._sleepperiod = 1.0
        threading.Thread._ _init_ _(self, name=name)
    def run(self):
        """ main control loop """
        print "%s starts" % (self.getName( ),)
        count = 0
        while not self._stopevent.isSet( ):
            count += 1
            print "loop %d" % (count,)
            self._stopevent.wait(self._sleepperiod)
        print "%s ends" % (self.getName( ),)
    def join(self, timeout=None):
        """ Stop the thread and wait for it to end. """
        self._stopevent.set( )
        threading.Thread.join(self, timeout)
if _ _name_ _ == "_ _main_ _":
    testthread = TestThread( )
    testthread.start( )
    import time
    time.sleep(5.0)
    testthread.join( )

Discussion

You often want to exert some control on a thread from the outside, but the ability to kill a thread is, well, overkill. Python doesn’t give you this ability, and thus forces you to design your thread systems more carefully. This recipe is based on the idea of a thread whose main function uses a loop. Periodically, the loop checks if a threading.Event object has been set. If so, the thread terminates; otherwise, it waits for the object.

The TestThread class in this recipe also overrides threading.Thread’s join method. Normally, join waits only for a certain thread to terminate (for up to a specified amount of time, if any) without doing anything to cause that termination. In this recipe, however, join is overridden to set the stop event object before delegating the rest of its operation to the normal (base class) join method. Therefore, in this recipe, the join call is guaranteed to terminate the target thread within a short amount of time.

You can use the recipe’s central idea (a loop periodically checking a threading.Event to determine whether it must terminate) in several other, slightly different ways. The Event’s wait method can let you pause the target thread. You can also expose the Event, letting controller code set it and then go on its merry way without bothering to join the thread, knowing the thread will terminate in a short amount of time. Once the event is exposed, you may choose to use the same event to request the termination of more than one thread—for example, all threads in a certain thread pool might stop when one event object they all share is set. The simplicity of this recipe provides the modest amount of control I need, with no headaches, so I haven’t pursued the more sophisticated (and complicated) ideas.

Python also lets you terminate a thread in another way: by raising an exception in that thread. This “rougher” approach also has its limits: it cannot interrupt a blocking call to the operating system, and it could fail to work if the thread you want to terminate is executing a try clause whose except clauses are too broad. Despite its limits, this approach can still sometimes be useful, when you’re essentially writing a debugger: that is, when you cannot count on the code executing in the target thread to be well written, but you can hope the code is not written in an utterly disastrous way. The normal way to make use of this functionality is by running the possibly-buggy code in the main thread, after spawning a separate monitoring thread to keep an eye on things. If the monitoring thread decides the time has come to terminate the code that is currently running in the main thread, the monitoring thread can call thread.interrupt_main, passing as the argument the desired exception class.

Once in a blue moon, the debugger you’re writing cannot run the possibly-buggy code in the process’ main thread, typically because that thread is required for other uses by some other framework you depend on, such as your GUI code. To support such remote eventualities, the Python interpreter has a function that can raise an exception in any thread, given the target thread’s ID. However, this specialized functionality is intended for a tiny subset of that tiny subset of Python applications that are debuggers. To avoid tempting all other Python programmers (well over 99.9%) into misusing this approach for any other case of thread termination, the function is not directly callable from Python code: rather, the function is only exposed as a part of Python’s C API. This special function’s name is PyThreadState_SetAsyncExc, and the function’s two arguments are the target thread’s ID and the class of the desired exception. If you are writing a Python debugger with such peculiar needs, no doubt you already have, as part of your code, at least one C-coded Python extension module that supplies to your higher-level Python code other tidbits of peculiar, low-level functionality. Just add to your C code, a Python-callable function that in turn calls PyThreadState_SetAsyncExc, and your debugger will gain this peculiar but useful functionality.

See Also

Documentation of the standard library module threading in the Library Reference and Python in a Nutshell.

9.3. Using a Queue.Queue as a Priority Queue

Credit: Simo Salminen, Lee Harr, Mark Moraes, Chris Perkins, Greg Klanderman

Problem

You want to use a Queue.Queue instance, since it is the best way to communicate among threads. However, you need the additional functionality of being able to specify a priority value associated with each item on the queue, so that items with a lower (more urgent) priority value are fetched before others with a higher (less urgent) priority value.

Solution

Among its many advantages, Queue.Queue offers an elegant architecture that eases subclassing for purposes of specializing queueing behavior. Specifically, Queue.Queue exposes several methods specifically designed to be overridden in a subclass, to get specialized queueing behavior without worrying about synchronization issues.

We can exploit this elegant architecture and module heapq from the Python Standard Library to build the needed priority-queue functionality pretty easily. However, we also need to shadow and wrap Queue.Queue’s put and get methods, to decorate each item with its priority and posting time upon put, and strip off these decorations upon get:

import Queue, heapq, time
class PriorityQueue(Queue.Queue):
   # Initialize the queue
    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = [  ]
    # Return the number of items that are currently enqueued
    def _qsize(self):
        return len(self.queue)
    # Check whether the queue is empty
    def _empty(self):
        return not self.queue
    # Check whether the queue is full
    def _full(self):
        return self.maxsize > 0 and len(self.queue) >= self.maxsize
    # Put a new item in the queue
    def _put(self, item):
        heapq.heappush(self.queue, item)
    # Get an item from the queue
    def _get(self):
        return heapq.heappop(self.queue)
    # shadow and wrap Queue.Queue's own `put' to allow a 'priority' argument
    def put(self, item, priority=0, block=True, timeout=None):
        decorated_item = priority, time.time( ), item
        Queue.Queue.put(self, decorated_item, block, timeout)
    # shadow and wrap Queue.Queue's own `get' to strip auxiliary aspects
    def get(self, block=True, timeout=None):
        priority, time_posted, item = Queue.Queue.get(self, block, timeout)
        return item

Discussion

Given an instance q of this recipe’s PriorityQueue class, you can call q.put(anitem) to enqueue an item with “normal” priority (here defined as 0), or q.put(anitem, prio) to enqueue an item with a specific priority prio. At the time q.get() gets called (presumably in another thread), items with the lowest priority will be returned first, bypassing items with higher priority. Negative priorities are lower than “normal”, thus suitable for “urgent” items; positive priorities, higher than “normal”, indicate items that may wait longer, since other items with “normal” priority will get fetched before them. Of course, if you’re not comfortable with this conception of priorities, nothing stops you from altering this recipe’s code accordingly: for example, by changing sign to the priority value when you build the decorated_item at the start of method put. If you do so, items posted with positive priority will become the urgent ones and items posted with negative priority will become the can-wait-longer ones.

Queue.Queue’s architecture deserves study, admiration, and imitation. Not only is Queue.Queue, all on its own, the best way to architect communication among threads, but this same class is also designed to make it easy for you to subclass and specialize it with queueing disciplines different from its default FIFO (first-in, first-out), such as the priority-based queueing discipline implemented in this recipe. Specifically, Queue.Queue uses the wonderful Template Method Design Pattern (http://www.aleax.it/Python/os03_template_dp.pdf ). This DP enables Queue.Queue itself to take care of the delicate problems connected with locking, while delegating the queueing discipline to specific methods _put, _get, and so on, which may be overridden by subclasses; such hook methods then get called in a context where synchronization issues are not a concern.

In this recipe, we also need to override Queue.Queue’s put and get methods, because we need to add a priority optional argument to put’s signature, decorate the item before we put it on the queue (so that the heapq module’s mechanisms will produce the order we want—lowest priority first, and, among items posted with equal priority, FIFO ordering), and undecorate each decorated item that we get back from the queue to return the naked item. All of these auxiliary tweaks use nothing but local variables, however, so they introduce no synchronization worries whatsoever. Each thread gets its own stack; therefore, any code that uses nothing but local variables (and thus cannot possibly alter any state accessible from other threads, or access any state that other threads might alter) is inherently thread-safe.

See Also

Modules Queue and heapq of the Python Standard Library are documented in Library Reference and Python in a Nutshell; the Template Method Design Pattern is illustrated at http://www.strakt.com/docs/os03_template_dp.pdf; Recipe 19.14, and Recipe 5.7, show other examples of coding and using priority queues.

9.4. Working with a Thread Pool

Credit: John Nielsen, Justin A

Problem

You want your main thread to be able to farm out processing tasks to a pool of worker threads.

Solution

The Queue.Queue type is the simplest and most effective way to coordinate a pool of worker threads. We could group all the needed data structures and functions into a class, but there’s no real need to. So, here they are, shown as globals instead:

import threading, Queue, time, sys
# Globals (start with a capital letter)
Qin  = Queue.Queue( )
Qout = Queue.Queue( )
Qerr = Queue.Queue( )
Pool = [  ]
def report_error( ):
    ''' we "report" errors by adding error information to Qerr '''
    Qerr.put(sys.exc_info( )[:2])
def get_all_from_queue(Q):
    ''' generator to yield one after the others all items currently
        in the Queue Q, without any waiting
    '''
    try:
        while True:
            yield Q.get_nowait( )
    except Queue.Empty:
        raise StopIteration
def do_work_from_queue( ):
    ''' the get-some-work, do-some-work main loop of worker threads '''
    while True:
        command, item = Qin.get( )       # implicitly stops and waits
        if command == 'stop':
            break
        try:
            # simulated work functionality of a worker thread
            if command == 'process':
                result = 'new' + item
            else:
                raise ValueError, 'Unknown command %r' % command
        except:
            # unconditional except is right, since we report _all_ errors
            report_error( )
        else:
            Qout.put(result)
def make_and_start_thread_pool(number_of_threads_in_pool=5, daemons=True):
    ''' make a pool of N worker threads, daemonize, and start all of them '''
    for i in range(number_of_threads_in_pool):
         new_thread = threading.Thread(target=do_work_from_queue)
         new_thread.setDaemon(daemons)
         Pool.append(new_thread)
         new_thread.start( )
def request_work(data, command='process'):
    ''' work requests are posted as (command, data) pairs to Qin '''
    Qin.put((command, data))
def get_result( ):
    return Qout.get( )     # implicitly stops and waits
def show_all_results( ):
    for result in get_all_from_queue(Qout):
        print 'Result:', result
def show_all_errors( ):
    for etyp, err in get_all_from_queue(Qerr):
        print 'Error:', etyp, err
def stop_and_free_thread_pool( ):
    # order is important: first, request all threads to stop...:
    for i in range(len(Pool)):
        request_work(None, 'stop')
    # ...then, wait for each of them to terminate:
    for existing_thread in Pool:
        existing_thread.join( )
    # clean up the pool from now-unused thread objects
    del Pool[:]

Discussion

It is generally a mistake to architect a multithreading program on the premise of having it spawn arbitrarily high numbers of threads as needed. Most often, the best architecture for such a program is based on farming out work to a fixed and relatively small number of worker threads—an arrangement known as a thread pool. This recipe shows a very simple example of a thread pool, focusing on the use of Queue.Queue instances as the most useful and simplest way for inter-thread communication and synchronization.

In this recipe, worker threads run function do_work_from_queue, which has the right structure for a typical worker thread but does really minimal “processing” (just as an example). In this case, the worker thread computes a “result” by prepending the string 'new' to each arriving item (note that this implicitly assumes that arriving items are strings). In your applications, of course, you will have, in the equivalent of this do_work_from_queue function, more substantial processing, and quite possibly different kinds of processing depending on the value of the command parameter.

In addition to the worker threads in the pool, a multithreading program often has other specialized threads for various purposes, such as interfacing to various entities external to the program (a GUI, a database, a library that is not guaranteed to be thread-safe). In this recipe, such specialized threads are not shown. However, it does include at least a “main thread”, which starts and stops the thread pool, determines the units of work to be farmed out, and eventually gathers all results and any errors that may have been reported.

In your applications, you may or may not want to start and stop the thread pool repeatedly. Most typically, you may start the pool as a part of your program’s initialization, leave it running throughout, and stop it, if at all, only as a part of your program’s final cleanup. If you set your worker threads as “daemons”, as this recipe’s function make_and_start_thread_pool sets them by default, it means that your program will not continue running when only worker threads are left. Rather, your program will terminate as soon as the main thread terminates. Again, this arrangement is a typically advisable architecture. At any rate, the recipe also provides a function stop_and_free_thread_pool, just in case you do want to terminate and clean up your thread pool at some point (and possibly later make and restart another one with another call to make_and_start_thread_pool).

An example use of the functionality in this recipe might be:

for i in ('_ba', '_be', '_bo'): request_work(i)
make_and_start_thread_pool( )
stop_and_free_thread_pool( )
show_all_results( )
show_all_errors( )

The output from this snippet should normally be:

Result: new_ba
Result: new_be
Result: new_bo

although it’s possible (but quite unlikely) that two of the results might end up exchanged. (If ordering of results is important to you, be sure to add a progressive number to the work requests you post from the main thread, and report it back to the main thread as part of each result or error.)

Here is a case where an error occurs and gets reported:

for i in ('_ba', 7, '_bo'): request_work(i)
make_and_start_thread_pool( )
stop_and_free_thread_pool( )
show_all_results( )
show_all_errors( )

The output from this snippet should normally be (net of an extremely unlikely, but not impossible, exchange between the two “Result” lines):

Result: new_ba
Result: new_bo
Error: exceptions.TypeError cannot concatenate 'str' and 'int' objects

The worker thread that gets the item 7 reports a TypeError because it tries to concatenate the string 'new' with this item, which is an int—an invalid operation. Not to worry: we have the try/except statement in function do_work_from_queue exactly to catch any kind of error, and Queue Qerr and functions report_error and show_all_errors exactly to ensure that errors do not pass silently, unless explicitly silenced, which is a key point of Python’s general approach to programming.

See Also

Library Reference docs on threading and Queue modules; Python in a Nutshell chapter on threads.

9.5. Executing a Function in Parallel on Multiple Argument Sets

Credit: Guy Argo

Problem

You want to execute a function simultaneously over multiple sets of arguments. (Presumably the function is “I/O bound”, meaning it spends substantial time doing input/output operations; otherwise, simultaneous execution would be useless.)

Solution

Use one thread for each set of arguments. For good performance, it’s best to limit our use of threads to a bounded pool:

import threading, time, Queue
class MultiThread(object):
    def _ _init_ _(self, function, argsVector, maxThreads=5, queue_results=False):
        self._function = function
        self._lock = threading.Lock( )
        self._nextArgs = iter(argsVector).next
        self._threadPool = [ threading.Thread(target=self._doSome)
                             for i in range(maxThreads) ]
        if queue_results:
            self._queue = Queue.Queue( )
        else:
            self._queue = None
    def _doSome(self):
        while True:
            self._lock.acquire( )
            try:
                try:
                    args = self._nextArgs( )
                except StopIteration:
                    break
            finally:
                self._lock.release( )
            result = self._function(args)
            if self._queue is not None:
                self._queue.put((args, result))
    def get(self, *a, **kw):
        if self._queue is not None:
            return self._queue.get(*a, **kw)
        else:
            raise ValueError, 'Not queueing results'
    def start(self):
        for thread in self._threadPool:
            time.sleep(0)    # necessary to give other threads a chance to run
            thread.start( )
    def join(self, timeout=None):
        for thread in self._threadPool:
            thread.join(timeout)
if _ _name_ _=="_ _main_ _":
    import random
    def recite_n_times_table(n):
        for i in range(2, 11):
            print "%d * %d = %d" % (n, i, n * i)
            time.sleep(0.3 + 0.3*random.random( ))
    mt = MultiThread(recite_n_times_table, range(2, 11))
    mt.start( )
    mt.join( )
    print "Well done kids!"

Discussion

This recipe’s MultiThread class offers a simple way to execute a function in parallel, on many sets of arguments, using a bounded pool of threads. Optionally, you can ask for results of the calls to the function to be queued, so you can retrieve them, but by default the results are just thrown away.

The MultiThread class takes as its arguments a function, a sequence of argument tuples for said function, and optionally a boundary on the number of threads to use in its pool and an indicator that results should be queued. Beyond the constructor, it exposes three methods: start, to start all the threads in the pool and begin the parallel evaluation of the function over all argument tuples; join, to perform a join on all threads in the pool (meaning to wait for all the threads in the pool to have terminated); and get, to get queued results (if it was instantiated with the optional flag queue_results set to True, to ask for results to be queued). Internally, class MultiThread uses its private method doSome as the target callable for all threads in the pool. Each thread works on the next available tuple of arguments (supplied by the next method of an iterator on the iterable whose items are such tuples, with the call to next being guarded by the usual locking idiom), until all work has been completed.

As is usual in Python, the module can also be run as a free-standing main script, in which case it runs a simple demonstration and self-test. In this case, the demonstration simulates a class of schoolchildren reciting multiplication tables as fast as they can.

Real use cases for this recipe mostly involve functions that are I/O bound, meaning functions that spend substantial time performing I/O. If a function is “CPU bound”, meaning the function spends its time using the CPU, you get better overall performance by performing the computations one after the other, rather than in parallel. In Python, this observation tends to hold even on machines that dedicate multiple CPUs to your program, because Python uses a GIL (Global Interpreter Lock), so that pure Python code from a single process does not run simultaneously on more than one CPU at a time.

Input/output operations release the GIL, and so can (and should) any C-coded Python extension that performs substantial computations without callbacks into Python. So, it is possible that parallel execution may speed up your program, but only if either I/O or a suitable C-coded extension is involved, rather than pure computationally intensive Python code. (Implementations of Python on different virtual machines, such as Jython, which runs on a JVM [Java Virtual Machine], or IronPython, which runs on the Microsoft .NET runtime, are of course not bound by these observations: these observations apply only to the widespread “classical Python”, meaning CPython, implementation.)

See Also

Library Reference and Python in a Nutshell docs on modules threading and Queue.

9.6. Coordinating Threads by Simple Message Passing

Credit: Michael Hobbs

Problem

You want to write a multithreaded application, using, as the synchronization and communication primitive, a simple yet powerful message-passing paradigm.

Solution

The candygram module lets you use concurrent programming semantics that are essentially equivalent to those of the Erlang language. To use candygram, you start by defining appropriate classes, such as the following one, to model your threads’ functionality:

import candygram as cg
class ExampleThread(object):
    """A thread-class with just a single counter value and a stop flag."""
    def _ _init_ _(self):
        """ Initialize the counter to 0, the running-flag to True. """
        self.val = 0
        self.running = True
    def increment(self):
        """ Increment the counter by one. """
        self.val += 1
    def sendVal(self, msg):
        """ Send current value of counter to requesting thread. """
        req = msg[0]
        req.send((cg.self( ), self.val))
    def setStop(self):
        """ Set the running-flag to False. """
        self.running = False
    def run(self):
        """ The entry point of the thread. """
        # Register the handler functions for various messages:
        r = cg.Receiver( )
        r.addHandler('increment', self.increment)
        r.addHandler((cg.Process, 'value'), self.sendVal, cg.Message)
        r.addHandler('stop', self.setStop)
        # Keep handling new messages until a stop has been requested
        while self.running:
            r.receive( )

To start a thread running this code under candygram, use:

counter = cg.spawn(ExampleThread( ).run)

To handle the counter thread’s responses, you need another Receiver object, with the proper handler registered:

response = cg.Receiver( )
response.addHandler((counter, int), lambda msg: msg[1], cg.Message)

And here is an example of how you might use these counter and response objects:

# Tell thread to increment twice
counter.send('increment')
counter.send('increment')
# Request the thread's current value, then print the thread's response
counter.send((cg.self( ), 'value'))
print response.receive( )
# Tell thread to increment one more time
counter.send('increment')
# Again, request the thread's current value, then print the thread's response
counter.send((cg.self( ), 'value'))
print response.receive( )
# Tell the thread to stop running
counter.send('stop')

Discussion

With the candygram module (http://candygram.sourceforge.net), Python developers can send and receive messages between threads using semantics nearly identical to those introduced in the Erlang language (http://www.erlang.org). Erlang is widely respected for its elegant built-in facilities for concurrent programming.

Erlang’s approach is simple and yet powerful. To communicate with another thread, simply send a message to it. You do not need to worry about locks, semaphores, mutexes, and other such primitives, to share information among concurrent tasks. Developers of multitasking software mostly use message passing only to implement a producer/consumer model. When you combine message passing with the flexibility of a Receiver object, however, it becomes much more powerful. For example, by using timeouts and message patterns, a thread may easily handle its messages as a state machine, or as a priority queue.

For those who wish to become more familiar with Erlang, http://www.erlang.org/download/erlang-book-part1.pdf (Concurrent Programming in Erlang) provides a very complete introduction. In particular, the candygram module implements all of the functions described in Chapter 5 and sections 7.2, 7.3, and 7.5 of that book.

This recipe offers a very elementary demonstration of how messages are passed between threads using candygram. When you run this recipe as a script, the print statements will output the values 2 and then 3.

It’s important to understand how the candygram.Receiver class works. The addHandler method requires at least two parameters: the first is a message pattern and the second is a handler function. The Receiver.receive method invokes a registered handler function, and returns that function’s result, whenever it finds a message that matches the associated pattern. Any parameters optionally passed to addHandler beyond the first two get passed as parameters to the handler function when the Receiver calls it. If a parameter is the candygram.Message constant, then receive replaces that parameter with the matching message when it calls the handler function.

This recipe’s code contains four different message patterns: 'increment', (cg.Process, 'value'), 'stop', and (counter, int). The 'increment' and 'stop' patterns are simple patterns that match any message that consists solely of the strings 'increment' and 'stop', respectively. The (cg.Process, 'value') pattern matches any message that is a tuple with two items, where the first item isinstance of cg.Process and the second item is the string value. Lastly, the (counter, int) pattern matches any message that is a tuple with two items where the first item is the counter object and the second element is an integer.

You can find more information about the Candygram package at http://candygram.sourceforge.net. At that URL, you can find all details on how to specify message patterns, how to set a timeout for the Receiver.receive method, and how to monitor the running status of spawned threads.

See Also

Concurrent Programming in Erlang at http://www.erlang.org/download/erlang-book-part1.pdf; the candygram home page at http://candygram.sourceforge.net.

9.7. Storing Per-Thread Information

Credit: John E. Barham, Sami Hangaslammi, Anthony Baxter

Problem

You need to allocate to each thread some storage that only that thread can use.

Solution

Thread-specific storage is a useful design pattern, and Python 2.3 did not yet support it directly. However, even in 2.3, we could code it up in terms of a dictionary protected by a lock. For once, it’s slightly more general, and not significantly harder, to program to the lower-level thread module, rather than to the more commonly useful, higher-level threading module that Python offers on top of it:

_tss = {  }
try:
    import thread
except ImportError:
    # We're running on a single-threaded platform (or, at least, the Python
    # interpreter has not been compiled to support threads), so we just return
    # the same dict for every call -- there's only one thread around anyway!
    def get_thread_storage( ):
        return _tss
else:
    # We do have threads; so, to work:
    _tss_lock = thread.allocate_lock( )
    def get_thread_storage( ):
        """ Return a thread-specific storage dictionary. """
        thread_id = thread.get_ident( )
        _tss_lock.acquire( )
        try:
            return _tss.set_default(thread_id, {  })
        finally:
            _tss_lock.release( )

Python 2.4 offers a much simpler and faster implementation, of course, thanks to the new threading.local function:

try:
    import threading
except ImportError:
    import dummy_threading as threading
_tss = threading.local( )
def get_thread_storage( ):
    return _tss._ _dict_ _

Discussion

The main benefit of multithreaded programs is that all of the threads can share global objects when they need to do so. Often, however, each thread also needs some storage of its own—for example, to store a network or database connection unique to itself. Indeed, each such externally oriented object is generally best kept under the control of a single thread, to avoid multiple possibilities of highly peculiar behavior, race conditions, and so on. The get_thread_storage function in this recipe solves this problem by implementing the “thread-specific storage” design pattern, and specifically by returning a thread-specific storage dictionary. The calling thread can then use the returned dictionary to store any kind of data that is private to the thread. This recipe is, in a sense, a generalization of the get_transaction function from ZODB, the object-oriented database underlying Zope.

One possible extension to this recipe is to add a delete_thread_storage function. Such a function would be useful, particularly if a way could be found to automate its being called upon thread termination. Python’s threading architecture does not make this task particularly easy. You could spawn a watcher thread to do the deletion after a join with the calling thread, but that’s a rather heavyweight approach. The recipe as presented, without deletion, is quite appropriate for the common and recommended architecture in which you have a pool of (typically daemonic) worker threads (perhaps some of them general workers, with others dedicated to interfacing to specific external resources) that are spawned at the start of the program and do not go away until the end of the whole process.

When multithreading is involved, implementation must always be particularly careful to detect and prevent race conditions, deadlocks, and other such conflicts. In this recipe, I have decided not to assume that a dictionary’s set_default method is atomic (meaning that no thread switch can occur while set_default executes)—adding a key can potentially change the dictionary’s whole structure, after all. If I was willing to make such an assumption, I could do away with the lock and vastly increase performance, but I suspect that such an assumption might make the code too fragile and dependent on specific versions of Python. (It seems to me that the assumption holds for Python 2.3, but, even if that is the case, I want my applications to survive subtle future changes to Python’s internals.) Another risk is that, if a thread terminates and a new one starts, the new thread might end up with the same thread ID as the just-terminated one, and therefore accidentally share the “thread-specific storage” dictionary left behind by the just-terminated thread. This risk might be mitigated (though not eliminated) by providing the delete_thread_storage function mentioned in the previous paragraph. Again, this specific problem does not apply to me, given the kind of multithreading architecture that I use in my applications. If your architecture differs, you may want to modify this recipe’s solution accordingly.

If the performance of this recipe’s version is insufficient for your application’s needs, due to excessive overhead in acquiring and releasing the lock, then, rather than just removing the lock at the risk of making your application fragile, you might consider an alternative:

_creating_threads = True
_tss_lock = thread.allocate_lock( )
_tss = {  }
class TssSequencingError(RuntimeError): pass
def done_creating_threads( ):
    """ switch from thread-creation to no-more-threads-created state """
    global _creating_threads
    if not _creating_threads:
        raise TssSequencingError('done_creating_threads called twice')
    _creating_threads = False
def get_thread_storage( ):
    """ Return a thread-specific storage dictionary. """
    thread_id = thread.get_ident( )
    # fast approach if thread-creation phase is finished
    if not _creating_threads: return _tss[thread_id]
    # careful approach if we're still creating threads
    try:
        _tss_lock.acquire( )
        return _tss.setdefault(thread_id, {  })
    finally:
        _tss_lock.release( )

This variant adds a boolean switch _creating_threads, initially True. As long as the switch is True, the variant uses a careful locking-based approach, quite similar to the one presented in this recipe’s Solution. At some point in time, when all threads that will ever exist (or at least all that will ever require access to get_thread_storage) have been started, and each of them has obtained its thread-local storage dictionary, your application calls done_creating_threads. This sets _creating_threads to False, and every future call to get_thread_storage then takes a fast path where it simply indexes into global dictionary _tss—no more acquiring and releasing the lock, no more creating a thread’s thread-local storage dictionary if it didn’t yet exist.

As long as your application can determine a moment in which it can truthfully call done_creating_threads, the variant in this subsection should definitely afford a substantial increase in speed compared to this recipe’s Solution. Note that it is particularly likely that you can use this variant if your application follows the popular and recommended architecture mentioned previously: a bounded set of daemonic, long-lived worker threads, all created early in your program. This is fortunate, because, if your application is performance-sensitive enough to worry about the locking overhead of this recipe’s solution, then no doubt you will want to structure your application that way. The alternative approach of having many short-lived threads is generally quite damaging to performance.

If your application needs to run only under Python 2.4, you can get a much simpler, faster, and solid implementation by relying on the new threading.local function. threading.local returns a new object on which any thread can get and set arbitrary attributes, independently from whatever getting and setting other threads may be doing on the same object. This recipe, in the 2.4 variant, returns the per-thread _ _dict_ _ of such an object, for uniformity with the 2.3 variant. This way, your applications can be made to run on both Python 2.3 and 2.4, using the best version in each case:

import sys
if sys.version >= '2.4': # insert 2.4 definition of get_local_storage here
else:
 # insert 2.3 definition of get_local_storage here

The 2.4 variant of this recipe also shows off the intended use of module dummy_threading, which, like its sibling dummy_thread, is also available in Python 2.3. By conditionally using these dummy modules, which are available on all platforms, whether or not Python was compiled with thread support, you may sometimes, with due care, be able to write applications that can run on any platform, taking advantage of threading where it’s available but running anyway even where threading is not available. In the 2.3 variant, we did not use the similar approach based on dummy_thread, because the overhead would be too high to pay on nonthreaded platforms; in the 2.4 variant, overhead is pretty low anyway, so we went for the simplicity that dummy_threading affords.

See Also

For an exhaustive treatment of the design pattern that describes thread-specific storage (albeit aimed at C++ programmers), see Douglas Schmidt, Timothy Harrisson, Nat Pryce, Thread-Specific Storage: An Object Behavioral Pattern for Efficiently Accessing per-Thread State (http://www.cs.wustl.edu/~schmidt/PDF/TSS-pattern.pdf); the Library Reference documentation dummy_thread, dummy_threading, and Python 2.4’s threading.local; ZODB at http://zope.org/Wikis/ZODB/FrontPage.

9.8. Multitasking Cooperatively Without Threads

Credit: Brian Bush, Troy Melhase, David Beach, Martin Miller

Problem

You have a task that seems suited to multithreading, but you don’t want to incur the overhead that real thread-switching would impose.

Solution

Generators were designed to simplify iteration, but they’re also quite suitable as a basis for cooperative multitasking, also known as microthreading:

import signal
# credit: original idea was based on an article by David Mertz
# http://gnosis.cx/publish/programming/charming_python_b7.txt
# some example 'microthread' generators
def empty(name):
    """ This is an empty task for demonstration purposes. """
    while True:
        print "<empty process>", name
        yield None
def terminating(name, maxn):
    """ This is a counting task for demonstration purposes. """
    for i in xrange(maxn):
        print "Here %s, %s out of %s" % (name, i, maxn)
        yield None
    print "Done with %s, bailing out after %s times" % (name, maxn)
def delay(duration=0.8):
    """ Do nothing at all for 'duration' seconds. """
    import time
    while True:
        print "<sleep %d>" % duration
        time.sleep(duration)
        yield None
class GenericScheduler(object):
    def _ _init_ _(self, threads, stop_asap=False):
        signal.signal(signal.SIGINT, self.shutdownHandler)
        self.shutdownRequest = False
        self.threads = threads
        self.stop_asap = stop_asap
    def shutdownHandler(self, n, frame):
        """ Initiate a request to shutdown cleanly on SIGINT."""
        print "Request to shut down."
        self.shutdownRequest = True
    def schedule(self):
        def noop( ):
            while True: yield None
        n = len(self.threads)
        while True:
            for i, thread in enumerate(self.threads):
                try: thread.next( )
                except StopIteration:
                    if self.stop_asap: return
                    n -= 1
                    if n==0: return
                    self.threads[i] = noop( )
                if self.shutdownRequest:
                    return
if _ _name_ _== "_ _main_ _":
    s = GenericScheduler([ empty('boo'), delay( ), empty('foo'),
                           terminating('fie', 5), delay(0.5),
                        ], stop_asap=True)
    s.schedule( )
    s = GenericScheduler([ empty('boo'), delay( ), empty('foo'),
                           terminating('fie', 5), delay(0.5),
                        ], stop_asap=False)
    s.schedule( )

Discussion

Microthreading (or cooperative multitasking) is an important technique. If you want to pursue it in earnest for complex uses, you should definitely look up the possibilities of Christian Tismer’s Stackless, a Python version specialized for microthreading, at http://www.stackless.com/. However, you can get a taste of cooperative multitasking without straying from Python’s core, by making creative use of generators, as shown in this recipe.

A simple approach to cooperative multitasking, such as the one presented in this recipe, is not suitable when your tasks must perform long-running work, particularly I/O tasks that may involve blocking system calls. For such applications, look into real threading, or, as a strong alternative, look into the event-driven approach offered by module asyncore in the Python Standard Library (on a simple scale) and by package Twisted at http://twistedmatrix.com/products/twisted (on a grandiose scale). But if your application has modest I/O needs, and you can slice up any computation your tasks perform into short chunks, each of which you can end with a yield, this recipe may be just what you’re looking for.

See Also

David Mertz’s site, chock-full of idiosyncratic, fascinating ideas, is at http://gnosis.cx/; Christian Tismer’s Stackless Python, the best way to do cooperative multitasking in Python (and much else besides), is at http://www.stackless.com/; Twisted Matrix, the best way to do event-driven (asynchronous) programming, is at http://twistedmatrix.com/.

9.9. Determining Whether Another Instanceof a Script Is Already Running in Windows

Credit: Bill Bell

Problem

In a Windows environment, you want to ensure that only one instance of a script is running at any given time.

Solution

Many tricks can be used to avoid starting multiple copies of an application, but they’re all quite fragile—except those based on a mutual-exclusion (mutex) kernel object, such as this one. Mark Hammond’s precious PyWin32 package supplies all the needed hooks into the Windows APIs to let us exploit a mutex for this purpose:

from win32event import CreateMutex
from win32api import GetLastError
from winerror import ERROR_ALREADY_EXISTS
from sys import exit
handle = CreateMutex(None, 1, 'A unique mutex name')
if GetLastError( ) == ERROR_ALREADY_EXISTS:
    # Take appropriate action, as this is the second
    # instance of this script; for example:
    print 'Oh! dear, I exist already.'
    exit(1)
else:
    # This is the only instance of the script; let
    # it do its normal work.  For example:
    from time import sleep
    for i in range(10):
        print "I'm running",i
        sleep(1)

Discussion

The string 'A unique mutex name' must be chosen to be unique to this script, and it must not be dynamically generated, because the string must have the same value for all potential simultaneous instances of the same script. A fresh, globally unique ID that you manually generate and insert at script-authoring time would be a good choice. According to the Windows documentation, the string can contain any characters except backslashes (). On Windows platforms that implement Terminal Services, you can optionally prefix the string with Global or Local, but such prefixes would make the string invalid for most versions of Windows, including NT, 95, 98, and ME.

The Win32 API call CreateMutex creates a Windows kernel object of the mutual-exclusion (mutex) kind and returns a handle to it. Note that we do not close this handle, because it needs to exist throughout the time this process is running. It’s important to let the Windows kernel take care of removing the handle (and the object it indicates, if the handle being removed is the only handle to that kernel object) when our process terminates.

The only thing we really care about is the return code from the API call, which we obtain by calling the GetLastError API right after it. That code is ERROR_ALREADY_EXISTS if and only if the mutual-exclusion object we tried to create already exists (i.e., if another instance of this script is already running).

This approach is perfectly safe and not subject to race conditions and similar anomalies, even if two instances of the script are trying to start at the same time (a reasonably frequent occurrence, e.g., if the user erroneously double-clicks in an Active Desktop setting where a single click already starts the application). The Windows specifications guarantee that only one of the instances will create the mutex, while the other will be informed that the mutex already exists. Mutual exclusion is therefore guaranteed by the Windows kernel itself, and the recipe is as solid as the operating system.

See Also

Documentation for the Win32 API in PyWin32 (http://starship.python.net/crew/mhammond/win32/Downloads.html) or ActivePython (http://www.activestate.com/ActivePython/); Windows API documentation available from Microsoft (http://msdn.microsoft.com); Python Programming on Win32, by Mark Hammond and Andy Robinson (O’Reilly).

9.10. Processing Windows Messages Using MsgWaitForMultipleObjects

Credit: Michael Robin

Problem

In a Win32 application, you need to process messages, but you also want to wait for kernel-level waitable objects, and coordinate several activities.

Solution

A Windows application’s message loop, also known as its message pump, is at the heart of Windows. It’s worth some effort to ensure that the heart beats properly and regularly:

import win32event
import pythoncom
TIMEOUT = 200 # ms
StopEvent = win32event.CreateEvent(None, 0, 0, None)
OtherEvent = win32event.CreateEvent(None, 0, 0, None)
class myCoolApp(object):
    def OnQuit(self):
          # assume 'areYouSure' is a global function that makes a final
          # check via a message box, a fancy dialog, or whatever else!
          if areYouSure( ):
              win32event.SetEvent(StopEvent) # Exit msg pump
def _MessagePump( ):
     waitables = StopEvent, OtherEvent
     while True:rc = win32event.MsgWaitForMultipleObjects(
               waitables,
             ,       # Wait for all = false, so it waits for any one
             TIMEOUT, # (or win32event.INFINITE)
             win32event.QS_ALLEVENTS) # Accept all kinds of events
         # You can call a function here, if it doesn't take too long. It will
         # be executed at least every TIMEOUT ms -- possibly a lot more often,
         # depending on the number of Windows messages received.
         if rc == win32event.WAIT_OBJECT_0:
             # Our first event listed, the StopEvent, was triggered, so
             # we must exit, terminating the message pump
             break
         elif rc == win32event.WAIT_OBJECT_0+1:
             # Our second event listed, "OtherEvent", was set. Do
             # whatever needs to be done -- you can wait on as many
             # kernel-waitable objects as needed (events, locks,
             # processes, threads, notifications, and so on).
             pass
         elif rc == win32event.WAIT_OBJECT_0+len(waitables):
             # A windows message is waiting - take care of it. (Don't
             # ask me why a WAIT_OBJECT_MSG isn't defined <
             # WAIT_OBJECT_0...!).
             # This message-serving MUST be done for COM, DDE, and other
             # Windows-y things to work properly!
             if pythoncom.PumpWaitingMessages( ):
                  break # we received a wm_quit message
         elif rc == win32event.WAIT_TIMEOUT:
             # Our timeout has elapsed.
             # Do some work here (e.g, poll something you can't thread)
             # or just feel good to be alive.
             pass
         else:
             raise RuntimeError("unexpected win32wait return value")

Discussion

Most Win32 applications must process messages, but you often want to wait on kernel waitables and coordinate a lot of things going on at the same time. A good message pump structure is the key to this, and this recipe exemplifies a reasonably simple but pretty effective one.

With the message pump shown in this recipe, messages and other events get dispatched as soon as they are posted, and a timeout allows you to poll other components. You may need to poll if the proper calls or event objects are not exposed in your Win32 event loop, as many components insist on running only on the application’s main thread and cannot run on spawned (secondary) threads.

You can add many other refinements, just as you can to any other Win32 message-pump approach. Python lets you do this with as much precision as C does, thanks to Mark Hammond’s PyWin32 package (which used to be known as win32all). However, the relatively simple message pump presented in this recipe is already a big step up from the typical naive application that can either serve its message loop or wait on kernel waitables, but not both.

The key to this recipe is the Windows API call MsgWaitForMultipleObjects, which takes several parameters. The first is a tuple of kernel objects you want to wait for. The second parameter is a flag that is normally 0. The value 1 indicates that you should wait until all the kernel objects in the first parameter are signaled, but my experience suggests that you almost invariably want to stop waiting when any one of these objects is signaled, so this parameter will almost always be 0. The third is a flag that specifies which Windows messages you want to interrupt the wait; always pass win32event.QS_ALLEVENTS here, to make sure any Windows message interrupts the wait. The fourth parameter is a timeout period (in milliseconds), or win32event.INFINITE if you are sure you do not need to do any periodic polling.

This function is a polling loop and, sure enough, it loops (with a while True, which is terminated only by a break within it). At each leg of the loop, it calls the API that waits for multiple objects. When that API stops waiting, it returns a code that explains why it stopped waiting. A value between win32event.WAIT_OBJECT_0 and win32event.WAIT_OBJECT_0+N-1 (where N is the number of waitable kernel objects in the tuple you passed as the first parameter), inclusive, means that the wait finished because an object was signaled (being signaled means different things for each kind of waitable kernel object). The return code’s difference from win32event.WAIT_OBJECT_0 is the index of the relevant object in the tuple.

A return value of win32event.WAIT_OBJECT_0+N means that the wait finished because a message was pending, and in this case, our recipe processes all pending Windows messages via a call to pythoncom.PumpWaitingMessages. (That function, in turn, returns a true result if a WM_QUIT message was received, so in this case, we break out of the whole while loop.) A code of win32event.WAIT_TIMEOUT means the wait finished because of a timeout, so we can do our polling there. In this case, no message is waiting, and none of our kernel objects of interest were signaled.

Basically, the way to tune this recipe for yourself is by using the right kernel objects as waitables (with an appropriate response to each) and by doing whatever you need to do periodically in the polling case. While this means you must have some detailed understanding of Win32, of course, it’s still quite a bit easier than designing your own special-purpose, message-loop function from scratch.

I suspect that a purist would find some way or other to wrap all of this message pumping into a neat module, letting each application customize its use of the module by passing in a list of waitables, some dictionary to map different waitables to chunks of code to execute, and a partridge in a pear tree. Go ahead, turn it all into a custom metaclass if you wish, see if I care. For once, though, I think the right approach to reusing this code is to copy it into your application’s source directories, and use your trusty text editor (gasp!) to tailor the message pump to your application’s exact needs.

See Also

Documentation for the Win32 API in PyWin32 (http://starship.python.net/crew/mhammond/win32/Downloads.html) or ActivePython (http://www.activestate.com/ActivePython/); Windows API documentation available from Microsoft (http://msdn.microsoft.com); Mark Hammond and Andy Robinson, Python Programming on Win32 (O’Reilly).

9.11. Driving an External Process with popen

Credit: Sébastien Keim, Tino Lange, Noah Spurrier

Problem

You want to drive an external process that accepts commands from its standard input, and you don’t care about the responses (if any) that the external process may emit on its standard output.

Solution

If you need to drive only the other process’ input and don’t care about its output, the simple os.popen function is enough. For example, here is a way to do animated graphics by driving the free program gnuplot via os.popen:

import os
f = os.popen('gnuplot', 'w')
print >>f, "set yrange[-300:+300]"
for n in range(300):
    print >>f, "plot %i*cos(x)+%i*log(x+10)" % (n, 150-n)
    f.flush( )
f.close( )

Discussion

When you want to use Python as a glue language, sometimes (in particularly easy cases) the simple function popen (from the standard library module os) may be all you need. Specifically, os.popen may suffice when you need to drive an external program that accepts commands on its standard input, as long as you can ignore any response that the program might be making on its standard output (and also error messages that the program might be sending to its standard error). A good example is given by the free plotting program gnuplot. (os.popen may also suffice when you need to obtain the output from a program that does not need to read its standard input.)

The statement f = os.popen('gnuplot', 'w') creates a file-like object connected to the standard input of the program it launches, namely 'gnuplot‘. (To try this recipe, you have to have gnuplot installed on your PATH, but since gnuplot is freely available and widely ported software, that should not be a problem!) Whatever we write to f, the external process receives on its standard input, just as would happen if we used that same program interactively. For more of the same, check out http://sourceforge.net/projects/gnuplot-py/: it’s a rich and interesting Python interface to gnuplot implemented entirely on the basis of the simple idea shown in this recipe!

When your needs are more sophisticated than os.popen can accommodate, you may want to look at os.popen2 and other such higher-numbered functions in module os, or, in Python 2.4, the new standard library module subprocess. However, in many cases, you’re likely to be disappointed: as soon as you get beyond the basics, driving (from your own programs) other external programs that were designed to be used interactively can become more than a little frustrating. Fortunately, a solution is at hand: it’s pexpect, a third-party Python module that you can find at http://pexpect.sourceforge.net/. pexpect is designed specifically for the task of driving other programs, and it lets you check on the other program’s responses as well as sending commands to the other program’s standard input. Still, while pexpect will most definitely offer you all the power you need, os.popen will probably suffice when you don’t need anything fancy!

See Also

Module os (specifically os.popen) in the Library Reference and Python in a Nutshell; gnuplot is at http://www.gnuplot.info/; gnuplot.py is at http://sourceforge.net/projects/gnuplot-py/; pexpect is at http://pexpect.sourceforge.net/.

9.12. Capturing the Output and Error Streams from a Unix Shell Command

Credit: Brent Burley, Bradey Honsinger, Tobias Polzin, Jonathan Cano, Padraig Brady

Problem

You need to run an external process in a Unix-like environment and capture both the output and error streams from that external process.

Solution

The popen2 module lets you capture both streams, but you also need help from module fcntl, to make the streams nonblocking and thus avoid deadlocks, and from module select, to orchestrate the action:

import os, popen2, fcntl, select
def makeNonBlocking(fd):
    fl = fcntl.fcntl(fd, os.F_GETFL)
    try:
        fcntl.fcntl(fd, os.F_SETFL, fl | os.O_NDELAY)
    except AttributeError:
        fcntl.fcntl(fd, os.F_SETFL, fl | os.FNDELAY)
def getCommandOutput(command):
    child = popen2.Popen3(command, 1) # Capture stdout and stderr from command
    child.tochild.close( )             # don't need to write to child's stdin
    outfile = child.fromchild
    outfd = outfile.fileno( )
    errfile = child.childerr
    errfd = errfile.fileno( )
    makeNonBlocking(outfd)            # Don't deadlock! Make fd's nonblocking.
    makeNonBlocking(errfd)
    outdata, errdata = [  ], [  ]
    outeof = erreof = False
    while True:
        to_check = [outfd]*(not outeof) + [errfd]*(not erreof)
        ready = select.select(to_check, [  ], [  ]) # Wait for input
        if outfd in ready[0]:
            outchunk = outfile.read( )
            if outchunk == '':
                outeof = True
            else:
                outdata.append(outchunk)
        if errfd in ready[0]:
            errchunk = errfile.read( )
            if errchunk == '':
                erreof = True
            else:
                errdata.append(errchunk)
        if outeof and erreof:
            break
        select.select([  ],[  ],[  ],.1) # Allow a little time for buffers to fill
    err = child.wait( )
    if err != 0:
        raise RuntimeError, '%r failed with exit code %d
%s' % (
            command, err, ''.join(errdata))
    return ''.join(outdata)
def getCommandOutput2(command):
    child = os.popen(command)
    data = child.read( )
    err = child.close( )
    if err:
        raise RuntimeError, '%r failed with exit code %d' % (command, err)

Discussion

This recipe shows how to execute a Unix shell command and capture the output and error streams in Python. By contrast, os.system sends both streams directly to the terminal. The function getCommandOutput presented in this recipe executes a command and returns the command’s output. If the command fails, getCommandOutput raises an exception, using the text captured from the command’s stderr as part of the exception’s arguments.

Most of the complexity of this code is due to the difficulty of capturing both the output and error streams of the child process independently and at the same time. Normal (blocking) read calls may deadlock if the child is trying to write to one stream, and the parent is waiting for data on the other stream; so, the streams must be set to nonblocking, and select must be used to wait for data on either of the streams.

Note that the second select call is included just to add a 0.1-second sleep after each read. Counter intuitively, this allows the code to run much faster, since it gives the child time to put more data in the buffer. Without it, the parent may try to read only a few bytes at a time, which can be very expensive. Calling time.sleep(0.1) should be exactly equivalent, but since I was already, necessarily, calling select.select elsewhere in the recipe’s code, I decided not to also import module time needlessly.

If you want to capture only the output and don’t mind the error stream going to the terminal, you can use the much simpler code presented in getCommandOutput2. If you want to suppress the error stream altogether, that’s easy, too—just append 2>/dev/null to the command. For example:

listing = getCommandOutput2('ls -1 2>/dev/null')

Another possibility is given by the os.popen4 function, which combines the output and error streams of the child process. However, in that case the streams are combined in a potentially messy way, depending on how they are buffered in the child process, so this recipe can help.

In Python 2.4, you can use class Popen, instead of popen2.Popen3, from the new standard library module subprocess. However, the issues highlighted in this recipe (namely, the need to use modules fcntl and select to make files nonblocking and coordinate the loop that interacts with the child process) aren’t really affected by whether you use popen2 or subprocess.

This recipe does, as advertised, require a rather Unix-like underlying platform. Cygwin, which does a generally great job of emulating Unix on top of Windows, is not sufficient; for example, it offers no way to set files to nonblocking mode, nor to select on general files. (Under Windows, you are allowed to select only on sockets, not on other files.) If you must run on such problematic, non-Unix platforms, you may prefer a very different approach, based on using temporary files:

import os, tempfile
def getCommandOutput(command):
    outfile = tempfile.mktemp( )
    errfile = tempfile.mktemp( )
    cmd = "( %s ) > %s 2> %s" % (command, outfile, errfile)
    err = os.system(cmd) >> 8
    try:
        if err != 0:
            raise RuntimeError, '%r failed with exit code %d
%s' % (
                command, err, file(errfile).read( ))
        return file(outfile).read( )
    finally:
        os.remove(outfile)
        os.remove(errfile)

See Also

Documentation of the standard library modules os, popen2, fcntl, select, and tempfile in the Library Reference and Python in a Nutshell; (Python 2.4 only) module subprocess in the Library Reference.

9.13. Forking a Daemon Process on Unix

Credit: Jürgen Hermann, Andy Gimblett, Josh Hoyt, Noah Spurrier, Jonathan Bartlett, Greg Stein

Problem

You need to fork a daemon process on a Unix or Unix-like system, which, in turn, requires a certain precise sequence of system calls.

Solution

Unix daemon processes must detach from their controlling terminal and process group. Doing so is not hard, but it does require some care, so it’s worth writing a daemonize.py module once and for all:

import sys, os
''' Module to fork the current process as a daemon.
    NOTE: don't do any of this if your daemon gets started by inetd!  inetd
    does all you need, including redirecting standard file descriptors;
    the chdir( ) and umask( ) steps are the only ones you may still want.
'''
def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
    ''' Fork the current process as a daemon, redirecting standard file
        descriptors (by default, redirects them to /dev/null).
    '''
    # Perform first fork.
    try:
        pid = os.fork( )
        if pid > 0:
            sys.exit(0) # Exit first parent.
    except OSError, e:
        sys.stderr.write("fork #1 failed: (%d) %s
" % (e.errno, e.strerror))
        sys.exit(1)
    # Decouple from parent environment.
    os.chdir("/")
    os.umask(0)
    os.setsid( )
    # Perform second fork.
    try:
        pid = os.fork( )
        if pid > 0:
            sys.exit(0) # Exit second parent.
    except OSError, e:
        sys.stderr.write("fork #2 failed: (%d) %s
" % (e.errno, e.strerror))
        sys.exit(1)
    # The process is now daemonized, redirect standard file descriptors.
    for f in sys.stdout, sys.stderr: f.flush( )
    si = file(stdin, 'r')
    so = file(stdout, 'a+')
    se = file(stderr, 'a+', 0)
    os.dup2(si.fileno( ), sys.stdin.fileno( ))
    os.dup2(so.fileno( ), sys.stdout.fileno( ))
    os.dup2(se.fileno( ), sys.stderr.fileno( ))
def _example_main ( ):
    ''' Example main function: print a count & timestamp each second '''
    import time
    sys.stdout.write('Daemon started with pid %d
' % os.getpid( ) )
    sys.stdout.write('Daemon stdout output
')
    sys.stderr.write('Daemon stderr output
')
    c = 0
    while True:
        sys.stdout.write('%d: %s
' % (c, time.ctime( )))
        sys.stdout.flush( )
        c = c + 1
        time.sleep(1)
if _ _name_ _ == "_ _main_ _":
    daemonize('/dev/null','/tmp/daemon.log','/tmp/daemon.log')
    _example_main( )

Discussion

Forking a daemon on Unix requires a certain specific sequence of system calls, which is explained in W. Richard Stevens’ seminal book, Advanced Programming in the Unix Environment (Addison-Wesley). We need to fork twice, terminating each parent process and letting only the grandchild of the original process run the daemon’s code. This allows us to decouple the daemon process from the calling terminal, so that the daemon process can keep running (typically as a server process without further user interaction, like a web server) even after the calling terminal is closed. The only visible effect of doing so is that when your script runs this module’s daemonize function, you get your shell prompt back immediately.

For all of the details about how and why this works in Unix and Unix-like systems, see Stevens’ wonderful book. Another important source of information on both practical and theoretical issues about “daemon forking” can be found as part of the Unix Programming FAQ, at http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16.

To summarize: the first fork lets the shell return, and also lets you do a setsid (to remove you from your controlling terminal, so you can’t accidentally be sent a signal). However, setsid makes this process a “session leader”, which means that if the process ever opens any terminal, it will become the process’ controlling terminal. We do not want a daemon to have any controlling terminal, which is why we fork again. After the second fork, the process is no longer a “session leader”, so it can open any file (including a terminal) without thereby accidentally reacquiring a controlling terminal.

Both Stevens and the Unix Programming FAQ provide examples in the C programming language, but since the Python Standard Library exposes a full POSIX interface, you can also do it all in Python. Typical C code for a daemon fork translates almost literally to Python; the only difference you have to care about—a minor detail—is that Python’s os.fork does not return -1 on errors, but rather throws an OSError exception. Therefore, rather than testing for a less-than-zero return code from fork, as we would in C, we run the fork in the try clause of a try/except statement, so that we can catch the exception, should it happen, and print appropriate diagnostics to standard error.

See Also

Documentation of the standard library module os in the Library Reference and Python in a Nutshell; Unix manpages for the fork, umask, and setsid system calls; W.Richard Stevens, Advanced Programming in the Unix Environment (Addison-Wesley); also, the Unix Programming FAQ on daemon forking, at http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset