Credit: Greg Wilson, Third Bit
Thirty years ago, in his classic The Mythical Man-Month: Essays on Software Engineering (Addison-Wesley), Fred Brooks drew a distinction between accidental and intrinsic complexity. Languages such as English and C++, with their inconsistent rules, exceptions, and special cases, are examples of the former: they make communication and programming harder than they need to be. Concurrency, on the other hand, is a prime example of the latter. Most people have to struggle to keep one chain of events straight in their minds; keeping track of two, three, or a dozen, plus all of their possible interactions, is just plain hard.
Computer scientists began studying ways of running multiple processes safely and efficiently in a single physical address space in the mid-1960s. Since then, a rich theory has been developed in which assertions about the behavior of interacting processes can be formalized and proved, and entire languages devoted to concurrent and parallel programming have been created. Foundations of Multithreaded, Parallel, and Distributed Programming, by Gregory R. Andrews (Addison-Wesley), is not only an excellent introduction to this theory, but also contains a great deal of historical information tracing the development of major ideas.
Over the past 20 years, opportunity and necessity have conspired to make concurrency a part of programmers’ everyday lives. The opportunity is for greater speed, which comes from the growing availability of multiprocessor machines. In the early 1980s, these were expensive curiosities; today, many programmers have dual-processor workstations on their desks and four-way or eight-way servers in the back room. If a calculation can be broken down into independent (or nearly independent) pieces, such machines can potentially solve them two, four, or eight times faster than their uniprocessor equivalents. While the potential gains from this approach are limited, it works well for problems as diverse as image processing, serving HTTP requests, and recompiling multiple source files.
The necessity for concurrent programming comes from GUIs and network applications. Graphical interfaces often need to appear to be doing several things at once, such as displaying images while scrolling ads across the bottom of the screen. While it is possible to do the necessary interleaving manually, it is much simpler to code each operation on its own and let the underlying operating system decide on a concrete order of operations. Similarly, network applications often have to listen on several sockets at once or send data on one channel while receiving data on another.
Broadly speaking, operating systems give programmers two kinds of concurrency. Processes run in separate logical address spaces that are protected from each other. Using concurrent processing for performance purposes, particularly in multiprocessor machines, is more attractive with threads, which execute simultaneously within the same program, in the same address space, without being protected from each other. The lack of mutual protection allows lower overhead and easier and faster communication, particularly because of the shared address space. Since all threads run code from the same program, no special security risks are caused by the lack of mutual protection, any more than the risks in a single-threaded program. Thus, concurrency used for performance purposes is most often focused on adding threads to a single program.
However, adding threads to a Python program to speed it up is often not a successful strategy. The reason is the Global Interpreter Lock (GIL), which protects Python’s internal data structures. This lock must be held by a thread before the thread can safely access Python objects. Without the lock, even simple operations (such as incrementing an integer) could fail. Therefore, only the thread with the GIL can manipulate Python objects or call Python/C API functions.
To make life easier for programmers, the interpreter
releases and reacquires the lock every 100 bytecode instructions (a
value that can be changed using sys.setcheckinterval
). The lock is also
released and reacquired around I/O operations, such as reading or
writing a file, so that other threads can run while the thread that
requests the I/O is waiting for the I/O operation to complete. However,
effective performance-boosting exploitation of multiple processors from
multiple pure-Python threads of the same process is just not in the
cards. Unless the CPU performance bottlenecks in your Python application
are in C-coded extensions that release the GIL, you will not observe
substantial performance increases by moving your multithreaded
application to a multiprocessor machine.
However, threading is not just about performance on
multiprocessor machines. A GUI can’t know when the user will press a key
or move the mouse, and an HTTP server can’t know which datagram will
arrive next. Handling each stream of events with a separate control
thread is therefore often the simplest way to cope with this
unpredictability, even on single-processor machines, and when high
throughput is not an overriding concern. Of course, event-driven
programming can often be used in these kinds of applications as well,
and Python frameworks such as asyncore
and Twisted are proof that this
approach can often deliver excellent performance with complexity that,
while different from that inherent in multithreading, is not necessarily
any more difficult to deal with.
The standard Python library allows programmers to approach
multithreaded programming at two different levels. The core module,
thread
, is a thin wrapper around the
basic primitives that any threading library must provide. Three of these
primitives are used to create, identify, and end threads; others are
used to create, test, acquire, and release simple mutual-exclusion locks
(or binary semaphores). As the recipes in this section demonstrate,
programmers should avoid using these primitives directly, and should
instead use the tools included in the higher-level threading
module, which is substantially more
programmer-friendly and has similar performance
characteristics.
Whether you use thread
or
threading
, some underlying aspects of
Python’s threading model stay the same. The GIL, in particular, works
just the same either way. The crucial advantage of the GIL is that it
makes it much easier to code Python extensions in C: unless your C
extension explicitly releases the GIL, you know thread switches won’t
happen until your C code calls back into Python code. This advantage can
be really important when your extension makes available to Python some
underlying C library that isn’t thread-safe. If your C code
is thread-safe, though, you can and should release
the GIL around stretches of computational or I/O operations that can
last for a substantial time without needing to make Python C API calls;
when you do this, you make it possible for Python programs using your C
extension to take advantage of more than one processor from multiple
threads within the same process. Make sure you acquire the GIL again
before calling any Python C API entry point, though!
Any time your code wants to access a data structure that
is shared among threads, you may have to wonder whether a given
operation is atomic, meaning that no thread
switch can happen during the operation. In general,
anything with multiple bytecodes is not atomic, since a thread switch
might always happen between one bytecode and the
next (you can use the standard library function dis.dis
to disassemble Python code into
bytecodes). Moreover, even a single bytecode is not atomic, if it can
call back to arbitrary Python code (e.g., because that bytecode can end
up executing a Python-coded special method). When in doubt, it is most
prudent to assume that whatever is giving you doubts is
not atomic: so, reduce to the bare minimum the data
structures accessed by more than one thread (except for instances of
Queue.Queue
, a class that is
specifically designed to be thread-safe!), and make sure you protect
with locks any access to any such structures that remain.
Almost invariably, the proper idiom to use some lock is:
somelock.acquire( )
try:
#operations needing the lock (keep to a minimum!)
finally:
somelock.release( )
The try
/finally
construct ensures the lock will be
released even if some exception happens in the code in the try
clause. Accidentally failing to release a
lock, due to some unforeseen exception, could soon make all of your
application come to a grinding halt. Also, be careful acquiring more
than one lock in sequence; if you really truly need to do such multiple
acquisitions, make sure all possible paths through the code acquire the
various locks in the same sequence. Otherwise,
you’re likely sooner or later to enter the disaster case in which two
threads are each trying to acquire a lock held by the other—a situation
known as deadlock, which does mean that your
program is as good as dead.
The most important elements of the threading
module are classes that represent
threads and various high-level synchronization constructs. The Thread
class represents a separate control
thread; it can be told what to do by passing a callable object to its
constructor, or, alternatively, by overriding its run
method. One thread can start another by
calling its start
method, and wait
for it to complete by calling join
.
Python also supports daemon threads, which do background processing
until all of the nondaemon threads in the program exit and then shut
themselves down automatically.
The synchronization constructs in the threading
module include locks, reentrant
locks (which a single thread can safely relock many times without
deadlocking), counting semaphores, conditions, and
events. Events can be used by one thread to signal others that something
interesting has happened (e.g., that a new item has been added to a
queue, or that it is now safe for the next thread to modify a shared
data structure). The documentation that comes with Python, specifically
the Library Reference manual, describes each of
these classes in detail.
The relatively low number of recipes in this chapter, compared to
some other chapters in this cookbook, reflects both Python’s focus on
programmer productivity (rather than absolute performance) and the
degree to which other packages (such as httplib
and wxPython
) hide the unpleasant details of
concurrency in important application areas. This relative scarcity also
reflects many Python programmers’ tendencies to look for the simplest
way to solve any particular problem, which complex threading rarely
is.
However, this chapter’s brevity may also reflect the Python
community’s underappreciation of the potential of simple threading, when
used appropriately, to simplify a programmer’s life. The Queue
module in particular supplies a
delightfully self-contained (and yet extensible and customizable!)
synchronization and cooperation structure that can provide all the
interthread supervision services you need. Consider a typical program,
which accepts requests from a GUI (or from the network). As a “result”
of such requests, the program will often find itself faced with the
prospect of having to perform a substantial chunk of work. That chunk
might take so long to perform all at once that, unless some precautions
are taken, the program would appear unresponsive to the GUI (or
network).
In a purely event-driven architecture, it may take
considerable effort on the programmer’s part to slice up such a hefty
work-chunk into slices of work thin enough that each slice can be
performed in idle time, without ever giving the appearance of
unresponsiveness. In cases such as this one, just a dash of
multithreading can help considerably. The main thread pushes a work
request describing the substantial chunk of background work onto a
dedicated Queue
instance, then goes
back to its task of making the program’s interface responsive at all
times.
At the other end of the Queue
,
a pool of daemonic worker threads await, each ready to peel a work
request off the Queue
and run it
straight through. This kind of overall architecture combines
event-driven and multithreaded approaches in the overarching ideal of
simplicity and is thus maximally Pythonic. You may need just a little
bit more work if the result of a worker thread’s efforts must be
presented again to the main thread (via another Queue
, of course), which is normally the case
with GUIs. If you’re willing to cheat just a little, and use polling for
the mostly event-driven main thread to access the result Queue
back from the daemonic worker threads.
See Recipe 11.9, to get
an idea of how simple that little bit of work can be.
Credit: André Bjärb, Alex Martelli, Radovan Chytracek
You want to share an object among multiple threads, but, to avoid conflicts, you need to ensure that only one thread at a time is inside the object—possibly excepting some methods for which you want to hand-tune locking behavior.
Java offers such synchronization as a built-in feature, while in Python you have to program it explicitly by wrapping the object and its methods. Wrapping is so general and useful that it deserves to be factored out into general tools:
def wrap_callable(any_callable, before, after): ''' wrap any callable with before/after calls ''' def _wrapped(*a, **kw): before( ) try: return any_callable(*a, **kw) finally: after( ) # In 2.4, only: _wrapped._ _name_ _ = any_callable._ _name_ _ return _wrapped import inspect class GenericWrapper(object): ''' wrap all of an object's methods with before/after calls ''' def _ _init_ _(self, obj, before, after, ignore=( )): # we must set into _ _dict_ _ directly to bypass _ _setattr_ _; so, # we need to reproduce the name-mangling for double-underscores clasname = 'GenericWrapper' self._ _dict_ _['_%s_ _methods' % clasname] = { } self._ _dict_ _['_%s_ _obj' % clasname] = obj for name, method in inspect.getmembers(obj, inspect.ismethod): if name not in ignore and method not in ignore: self._ _methods[name] = wrap_callable(method, before, after) def _ _getattr_ _(self, name): try: return self._ _methods[name] except KeyError: return getattr(self._ _obj, name) def _ _setattr_ _(self, name, value): setattr(self._ _obj, name, value)
Using these simple but general tools, synchronization becomes easy:
class SynchronizedObject(GenericWrapper): ''' wrap an object and all of its methods with synchronization ''' def _ _init_ _(self, obj, ignore=( ), lock=None): if lock is None: import threading lock = threading.RLock( ) GenericWrapper._ _init_ _(self, obj, lock.acquire, lock.release, ignore)
As per usual Python practice, we can complete this module with a small self-test, executed only when the module is run as main script. This snippet also serves to show how the module’s functionality can be used:
if _ _name_ _ == '_ _main_ _': import threading import time class Dummy(object): def foo(self): print 'hello from foo' time.sleep(1) def bar(self): print 'hello from bar' def baaz(self): print 'hello from baaz' tw = SynchronizedObject(Dummy( ), ignore=['baaz']) threading.Thread(target=tw.foo).start( ) time.sleep(0.1) threading.Thread(target=tw.bar).start( ) time.sleep(0.1) threading.Thread(target=tw.baaz).start( )
Thanks to the synchronization, the call to
bar
runs only when the call to
foo
has completed. However, because of the
ignore=
keyword argument, the call to
baaz
bypasses synchronization and thus
completes earlier. So the output is:
hello from foo hello from baaz hello from bar
When you find yourself using the same single-lock locking code in almost every method of an object, use this recipe to refactor the locking away from the object’s application-specific logic. The key effect you get by applying this recipe is to effectively replace each method with:
self.lock.acquire( )
try: # The "real" application code for the method
finally:
self.lock.release( )
This code idiom is, of course, the right way to express locking:
the try
/finally
statement ensures that the lock gets
released in any circumstance, whether the application code terminates
correctly or raises an exception. You’ll note that factory
wrap_callable
returns a closure, which is carefully
coded in exactly this way!
To some extent, this recipe can also be handy when you want to
postpone worrying about a class’ locking behavior. However, if you
intend to use this code for production purposes, you should understand
all of it. In particular, this recipe does not
wrap direct accesses (for getting or setting) to the object’s
attributes. If you want such direct accesses to respect the object’s
lock, you need to add the try
/finally
locking idiom to the wrapper’s
_ _getattr_ _
and _ _setattr_ _
special methods, around the
calls these methods make to the getattr
and setattr
built-in functions, respectively. I
normally don’t find that depth of wrapping to be necessary in my
applications. (The way I code, wrapping just the methods proves
sufficient.)
If you’re into custom metaclasses, you may be surprised that I do not offer a metaclass for these synchronization purposes. However, wrapping is a more dynamic and flexible approach—for example, an object can exist in both wrapped (synchronized) and unwrapped (raw) incarnations, and you can use the most appropriate one case by case. You pay for wrapping’s flexibility with a little bit more runtime overhead at each method call, but compared to the large costs of acquiring and releasing locks I don’t think this tiny extra overhead matters. Meanwhile, this recipe shows off, and effectively reuses, a wrapper-closure factory and a wrapper class that demonstrate how easy Python makes it to implement that favorite design pattern of Aspect-Oriented Programming’s fans, the insertion of “before-and-after” calls around every call to an object’s methods.
Documentation of the standard library modules threading
and inspect
in the Library
Reference and Python in a
Nutshell.
Credit: Doug Fort
You must terminate a thread from the outside, but Python doesn’t let one thread just brutally kill another, so you need to use a suitable controlled-termination idiom.
A frequently asked question is: How do I kill a thread? The answer is: You don’t. Instead, you kindly ask it to go away. Each thread must periodically check whether it’s been asked to go away and then comply (typically after some kind of cleanup). Here is an example:
import threading class TestThread(threading.Thread): def _ _init_ _(self, name='TestThread'): """ constructor, setting initial variables """ self._stopevent = threading.Event( ) self._sleepperiod = 1.0 threading.Thread._ _init_ _(self, name=name) def run(self): """ main control loop """ print "%s starts" % (self.getName( ),) count = 0 while not self._stopevent.isSet( ): count += 1 print "loop %d" % (count,) self._stopevent.wait(self._sleepperiod) print "%s ends" % (self.getName( ),) def join(self, timeout=None): """ Stop the thread and wait for it to end. """ self._stopevent.set( ) threading.Thread.join(self, timeout) if _ _name_ _ == "_ _main_ _": testthread = TestThread( ) testthread.start( ) import time time.sleep(5.0) testthread.join( )
You often want to exert some control on a thread from the
outside, but the ability to kill a thread is, well, overkill. Python
doesn’t give you this ability, and thus forces you to design your
thread systems more carefully. This recipe is based on the idea of a
thread whose main function uses a loop. Periodically, the loop checks
if a threading.Event
object has
been set. If so, the thread terminates; otherwise, it waits for the
object.
The TestThread
class in this recipe
also overrides threading.Thread
’s
join
method. Normally, join
waits only for a certain thread to
terminate (for up to a specified amount of time, if any) without doing
anything to cause that termination. In this
recipe, however, join
is overridden
to set the stop event object before delegating the rest of its
operation to the normal (base class) join
method. Therefore, in this recipe, the
join
call is guaranteed to
terminate the target thread within a short amount of time.
You can use the recipe’s central idea (a loop periodically
checking a threading.Event
to
determine whether it must terminate) in several other, slightly
different ways. The Event
’s
wait
method can let you pause the
target thread. You can also expose the Event
, letting controller code set
it and then go on its merry way without
bothering to join
the thread,
knowing the thread will terminate in a short amount of time. Once the
event is exposed, you may choose to use the same event to request the
termination of more than one thread—for example, all threads in a
certain thread pool might stop when one event object they all share is
set. The simplicity of this recipe provides the modest amount of
control I need, with no headaches, so I haven’t pursued the more
sophisticated (and complicated) ideas.
Python also lets you terminate a thread in another way: by
raising an exception in that thread. This “rougher” approach also has
its limits: it cannot interrupt a blocking call to the operating
system, and it could fail to work if the thread you want to terminate
is executing a try
clause whose
except
clauses are too broad.
Despite its limits, this approach can still sometimes be useful, when
you’re essentially writing a debugger: that is, when you cannot count
on the code executing in the target thread to be well written, but you
can hope the code is not written in an utterly disastrous way. The
normal way to make use of this functionality is by running the
possibly-buggy code in the main thread, after spawning a separate
monitoring thread to keep an eye on things. If the monitoring thread
decides the time has come to terminate the code that is currently
running in the main thread, the monitoring thread can call thread.interrupt_main
, passing as the
argument the desired exception class.
Once in a blue moon, the debugger you’re writing cannot
run the possibly-buggy code in the process’ main thread, typically
because that thread is required for other uses by some other framework
you depend on, such as your GUI code. To support such remote
eventualities, the Python interpreter has a function that can raise an
exception in any thread, given the target thread’s ID. However, this
specialized functionality is intended for a tiny subset of that tiny
subset of Python applications that are debuggers. To avoid tempting
all other Python programmers (well over 99.9%) into misusing this
approach for any other case of thread termination, the function is not
directly callable from Python code: rather, the function is only
exposed as a part of Python’s C API. This special function’s name is
PyThreadState_SetAsyncExc
, and the
function’s two arguments are the target thread’s ID and the class of
the desired exception. If you are writing a Python debugger with such
peculiar needs, no doubt you already have, as part of your code, at
least one C-coded Python extension module that supplies to your
higher-level Python code other tidbits of peculiar, low-level
functionality. Just add to your C code, a Python-callable function
that in turn calls PyThreadState_SetAsyncExc
, and your debugger
will gain this peculiar but useful functionality.
Documentation of the standard library module threading
in the Library
Reference and Python in a
Nutshell.
Credit: Simo Salminen, Lee Harr, Mark Moraes, Chris Perkins, Greg Klanderman
You want to use a Queue.Queue
instance, since it is the best
way to communicate among threads. However, you need the additional
functionality of being able to specify a priority value associated
with each item on the queue, so that items with a lower (more urgent)
priority value are fetched before others with a higher (less urgent)
priority value.
Among its many advantages, Queue.Queue
offers an elegant architecture
that eases subclassing for purposes of specializing queueing behavior.
Specifically, Queue.Queue
exposes
several methods specifically designed to be overridden in a subclass,
to get specialized queueing behavior without worrying about
synchronization issues.
We can exploit this elegant architecture and module
heapq
from the Python Standard
Library to build the needed priority-queue functionality pretty
easily. However, we also need to shadow and wrap Queue.Queue
’s put
and get
methods, to decorate each item with its
priority and posting time upon put
,
and strip off these decorations upon get
:
import Queue, heapq, time class PriorityQueue(Queue.Queue): # Initialize the queue def _init(self, maxsize): self.maxsize = maxsize self.queue = [ ] # Return the number of items that are currently enqueued def _qsize(self): return len(self.queue) # Check whether the queue is empty def _empty(self): return not self.queue # Check whether the queue is full def _full(self): return self.maxsize > 0 and len(self.queue) >= self.maxsize # Put a new item in the queue def _put(self, item): heapq.heappush(self.queue, item) # Get an item from the queue def _get(self): return heapq.heappop(self.queue) # shadow and wrap Queue.Queue's own `put' to allow a 'priority' argument def put(self, item, priority=0, block=True, timeout=None): decorated_item = priority, time.time( ), item Queue.Queue.put(self, decorated_item, block, timeout) # shadow and wrap Queue.Queue's own `get' to strip auxiliary aspects def get(self, block=True, timeout=None): priority, time_posted, item = Queue.Queue.get(self, block, timeout) return item
Given an instance q
of this
recipe’s PriorityQueue
class, you can call q.put(anitem)
to enqueue an item with
“normal” priority (here defined as 0
), or q.put(anitem, prio)
to enqueue an item with
a specific priority prio
. At the time
q.get()
gets called (presumably in
another thread), items with the lowest priority will be returned
first, bypassing items with higher priority. Negative priorities are
lower than “normal”, thus suitable for “urgent” items; positive
priorities, higher than “normal”, indicate items that may wait longer,
since other items with “normal” priority will get fetched before them.
Of course, if you’re not comfortable with this conception of
priorities, nothing stops you from altering this recipe’s code
accordingly: for example, by changing sign to the priority value when
you build the decorated_item
at the start of method
put
. If you do so, items posted
with positive priority will become the urgent ones and items posted
with negative priority will become the can-wait-longer ones.
Queue.Queue
’s architecture
deserves study, admiration, and imitation. Not only is Queue.Queue
, all on its own, the best way to
architect communication among threads, but this same class is also
designed to make it easy for you to subclass and specialize it with
queueing disciplines different from its default FIFO (first-in,
first-out), such as the priority-based queueing discipline implemented
in this recipe. Specifically, Queue.Queue
uses the wonderful Template
Method Design Pattern (http://www.aleax.it/Python/os03_template_dp.pdf
). This DP enables Queue.Queue
itself to take care of the delicate problems connected with locking,
while delegating the queueing discipline to specific methods _put
, _get
, and so on, which may be overridden by
subclasses; such hook methods
then get called in a context where synchronization issues are not a
concern.
In this recipe, we also need to override Queue.Queue
’s put
and get
methods, because we need to add a
priority
optional argument to put
’s signature, decorate the
item
before we put it on the queue (so that the
heapq
module’s mechanisms will
produce the order we want—lowest priority first, and, among items
posted with equal priority, FIFO ordering), and undecorate each
decorated item that we get back from the queue to return the naked
item
. All of these auxiliary tweaks use nothing but
local variables, however, so they introduce no synchronization worries
whatsoever. Each thread gets its own stack; therefore, any code that
uses nothing but local variables (and thus cannot possibly alter any
state accessible from other threads, or access any state that other
threads might alter) is inherently thread-safe.
Modules Queue
and heapq
of the Python Standard Library are
documented in Library Reference and
Python in a Nutshell; the Template Method
Design Pattern is illustrated at http://www.strakt.com/docs/os03_template_dp.pdf;
Recipe 19.14, and
Recipe 5.7, show other
examples of coding and using priority queues.
Credit: John Nielsen, Justin A
You want your main thread to be able to farm out processing tasks to a pool of worker threads.
The Queue.Queue
type
is the simplest and most effective way to coordinate a pool of worker
threads. We could group all the needed data structures and functions
into a class, but there’s no real need to. So, here they are, shown as
globals instead:
import threading, Queue, time, sys # Globals (start with a capital letter) Qin = Queue.Queue( ) Qout = Queue.Queue( ) Qerr = Queue.Queue( ) Pool = [ ] def report_error( ): ''' we "report" errors by adding error information to Qerr ''' Qerr.put(sys.exc_info( )[:2]) def get_all_from_queue(Q): ''' generator to yield one after the others all items currently in the Queue Q, without any waiting ''' try: while True: yield Q.get_nowait( ) except Queue.Empty: raise StopIteration def do_work_from_queue( ): ''' the get-some-work, do-some-work main loop of worker threads ''' while True: command, item = Qin.get( ) # implicitly stops and waits if command == 'stop': break try: # simulated work functionality of a worker thread if command == 'process': result = 'new' + item else: raise ValueError, 'Unknown command %r' % command except: # unconditional except is right, since we report _all_ errors report_error( ) else: Qout.put(result) def make_and_start_thread_pool(number_of_threads_in_pool=5, daemons=True): ''' make a pool of N worker threads, daemonize, and start all of them ''' for i in range(number_of_threads_in_pool): new_thread = threading.Thread(target=do_work_from_queue) new_thread.setDaemon(daemons) Pool.append(new_thread) new_thread.start( ) def request_work(data, command='process'): ''' work requests are posted as (command, data) pairs to Qin ''' Qin.put((command, data)) def get_result( ): return Qout.get( ) # implicitly stops and waits def show_all_results( ): for result in get_all_from_queue(Qout): print 'Result:', result def show_all_errors( ): for etyp, err in get_all_from_queue(Qerr): print 'Error:', etyp, err def stop_and_free_thread_pool( ): # order is important: first, request all threads to stop...: for i in range(len(Pool)): request_work(None, 'stop') # ...then, wait for each of them to terminate: for existing_thread in Pool: existing_thread.join( ) # clean up the pool from now-unused thread objects del Pool[:]
It is generally a mistake to architect a multithreading program
on the premise of having it spawn arbitrarily high numbers of threads
as needed. Most often, the best architecture for such a program is
based on farming out work to a fixed and relatively small number of
worker threads—an arrangement known as a
thread pool. This recipe shows a very simple
example of a thread pool, focusing on the use of Queue.Queue
instances as the most useful and
simplest way for inter-thread communication and
synchronization.
In this recipe, worker threads run function
do_work_from_queue
, which has the right structure for
a typical worker thread but does really minimal “processing” (just as
an example). In this case, the worker thread computes a “result” by
prepending the string 'new
' to each
arriving item (note that this implicitly assumes that arriving items
are strings). In your applications, of course, you will have, in the
equivalent of this do_work_from_queue
function, more
substantial processing, and quite possibly different kinds of
processing depending on the value of the command
parameter.
In addition to the worker threads in the pool, a multithreading program often has other specialized threads for various purposes, such as interfacing to various entities external to the program (a GUI, a database, a library that is not guaranteed to be thread-safe). In this recipe, such specialized threads are not shown. However, it does include at least a “main thread”, which starts and stops the thread pool, determines the units of work to be farmed out, and eventually gathers all results and any errors that may have been reported.
In your applications, you may or may not want to start and stop
the thread pool repeatedly. Most typically, you may start the pool as
a part of your program’s initialization, leave it running throughout,
and stop it, if at all, only as a part of your program’s final
cleanup. If you set your worker threads as “daemons”, as this recipe’s
function make_and_start_thread_pool
sets them by
default, it means that your program will not continue running when
only worker threads are left. Rather, your program will terminate as
soon as the main thread terminates. Again, this arrangement is a
typically advisable architecture. At any rate, the recipe also
provides a function stop_and_free_thread_pool
, just
in case you do want to terminate and clean up
your thread pool at some point (and possibly later make and restart
another one with another call to
make_and_start_thread_pool
).
An example use of the functionality in this recipe might be:
for i in ('_ba', '_be', '_bo'): request_work(i) make_and_start_thread_pool( ) stop_and_free_thread_pool( ) show_all_results( ) show_all_errors( )
The output from this snippet should normally be:
Result: new_ba Result: new_be Result: new_bo
although it’s possible (but quite unlikely) that two of the results might end up exchanged. (If ordering of results is important to you, be sure to add a progressive number to the work requests you post from the main thread, and report it back to the main thread as part of each result or error.)
Here is a case where an error occurs and gets reported:
for i in ('_ba', 7, '_bo'): request_work(i) make_and_start_thread_pool( ) stop_and_free_thread_pool( ) show_all_results( ) show_all_errors( )
The output from this snippet should normally be (net of an extremely unlikely, but not impossible, exchange between the two “Result” lines):
Result: new_ba Result: new_bo Error: exceptions.TypeError cannot concatenate 'str' and 'int' objects
The worker thread that gets the item 7
reports a TypeError
because it tries to concatenate
the string 'new
' with this item,
which is an int
—an invalid
operation. Not to worry: we have the try
/except
statement in function
do_work_from_queue
exactly to catch any kind of
error, and Queue
Qerr
and functions report_error
and
show_all_errors
exactly to ensure that errors do not
pass silently, unless explicitly silenced, which is a key point of
Python’s general approach to programming.
Library Reference docs on threading
and Queue
modules; Python in a
Nutshell chapter on threads.
Credit: Guy Argo
You want to execute a function simultaneously over multiple sets of arguments. (Presumably the function is “I/O bound”, meaning it spends substantial time doing input/output operations; otherwise, simultaneous execution would be useless.)
Use one thread for each set of arguments. For good performance, it’s best to limit our use of threads to a bounded pool:
import threading, time, Queue class MultiThread(object): def _ _init_ _(self, function, argsVector, maxThreads=5, queue_results=False): self._function = function self._lock = threading.Lock( ) self._nextArgs = iter(argsVector).next self._threadPool = [ threading.Thread(target=self._doSome) for i in range(maxThreads) ] if queue_results: self._queue = Queue.Queue( ) else: self._queue = None def _doSome(self): while True: self._lock.acquire( ) try: try: args = self._nextArgs( ) except StopIteration: break finally: self._lock.release( ) result = self._function(args) if self._queue is not None: self._queue.put((args, result)) def get(self, *a, **kw): if self._queue is not None: return self._queue.get(*a, **kw) else: raise ValueError, 'Not queueing results' def start(self): for thread in self._threadPool: time.sleep(0) # necessary to give other threads a chance to run thread.start( ) def join(self, timeout=None): for thread in self._threadPool: thread.join(timeout) if _ _name_ _=="_ _main_ _": import random def recite_n_times_table(n): for i in range(2, 11): print "%d * %d = %d" % (n, i, n * i) time.sleep(0.3 + 0.3*random.random( )) mt = MultiThread(recite_n_times_table, range(2, 11)) mt.start( ) mt.join( ) print "Well done kids!"
This recipe’s MultiThread
class offers a simple
way to execute a function in parallel, on many sets of arguments,
using a bounded pool of threads. Optionally, you can ask for results
of the calls to the function to be queued, so you can retrieve them,
but by default the results are just thrown away.
The MultiThread
class takes as its arguments a
function, a sequence of argument tuples for said function, and
optionally a boundary on the number of threads to use in its pool and
an indicator that results should be queued. Beyond the constructor, it
exposes three methods: start
, to start all the
threads in the pool and begin the parallel evaluation of the function
over all argument tuples; join
, to perform a join on
all threads in the pool (meaning to wait for all the threads in the
pool to have terminated); and get
, to get queued
results (if it was instantiated with the optional flag
queue_results
set to True
, to ask for results to be queued).
Internally, class MultiThread
uses its private method
doSome
as the target callable for all threads in the
pool. Each thread works on the next available tuple of arguments
(supplied by the next
method of an
iterator on the iterable whose items are such tuples, with the call to
next
being guarded by the usual
locking idiom), until all work has been completed.
As is usual in Python, the module can also be run as a free-standing main script, in which case it runs a simple demonstration and self-test. In this case, the demonstration simulates a class of schoolchildren reciting multiplication tables as fast as they can.
Real use cases for this recipe mostly involve functions that are I/O bound, meaning functions that spend substantial time performing I/O. If a function is “CPU bound”, meaning the function spends its time using the CPU, you get better overall performance by performing the computations one after the other, rather than in parallel. In Python, this observation tends to hold even on machines that dedicate multiple CPUs to your program, because Python uses a GIL (Global Interpreter Lock), so that pure Python code from a single process does not run simultaneously on more than one CPU at a time.
Input/output operations release the GIL, and so can (and should) any C-coded Python extension that performs substantial computations without callbacks into Python. So, it is possible that parallel execution may speed up your program, but only if either I/O or a suitable C-coded extension is involved, rather than pure computationally intensive Python code. (Implementations of Python on different virtual machines, such as Jython, which runs on a JVM [Java Virtual Machine], or IronPython, which runs on the Microsoft .NET runtime, are of course not bound by these observations: these observations apply only to the widespread “classical Python”, meaning CPython, implementation.)
Library Reference and Python
in a Nutshell docs on modules threading
and Queue
.
Credit: Michael Hobbs
You want to write a multithreaded application, using, as the synchronization and communication primitive, a simple yet powerful message-passing paradigm.
The candygram
module
lets you use concurrent programming semantics that are essentially
equivalent to those of the Erlang language. To use candygram
, you start by defining appropriate
classes, such as the following one, to model your threads’
functionality:
import candygram as cg class ExampleThread(object): """A thread-class with just a single counter value and a stop flag.""" def _ _init_ _(self): """ Initialize the counter to 0, the running-flag to True. """ self.val = 0 self.running = True def increment(self): """ Increment the counter by one. """ self.val += 1 def sendVal(self, msg): """ Send current value of counter to requesting thread. """ req = msg[0] req.send((cg.self( ), self.val)) def setStop(self): """ Set the running-flag to False. """ self.running = False def run(self): """ The entry point of the thread. """ # Register the handler functions for various messages: r = cg.Receiver( ) r.addHandler('increment', self.increment) r.addHandler((cg.Process, 'value'), self.sendVal, cg.Message) r.addHandler('stop', self.setStop) # Keep handling new messages until a stop has been requested while self.running: r.receive( )
To start a thread running this code under candygram
, use:
counter = cg.spawn(ExampleThread( ).run)
To handle the counter
thread’s responses, you need another Receiver
object, with the proper handler
registered:
response = cg.Receiver( ) response.addHandler((counter, int), lambda msg: msg[1], cg.Message)
And here is an example of how you might use these counter
and response
objects:
# Tell thread to increment twice counter.send('increment') counter.send('increment') # Request the thread's current value, then print the thread's response counter.send((cg.self( ), 'value')) print response.receive( ) # Tell thread to increment one more time counter.send('increment') # Again, request the thread's current value, then print the thread's response counter.send((cg.self( ), 'value')) print response.receive( ) # Tell the thread to stop running counter.send('stop')
With the candygram
module
(http://candygram.sourceforge.net), Python
developers can send and receive messages between threads using
semantics nearly identical to those introduced in the Erlang language
(http://www.erlang.org). Erlang is widely
respected for its elegant built-in facilities for concurrent
programming.
Erlang’s approach is simple and yet powerful. To
communicate with another thread, simply send a message to it. You do
not need to worry about locks, semaphores, mutexes, and other such
primitives, to share information among concurrent tasks. Developers of
multitasking software mostly use message passing only to implement a
producer/consumer model. When you combine message passing with the
flexibility of a Receiver
object,
however, it becomes much more powerful. For example, by using timeouts
and message patterns, a thread may easily handle its messages as a
state machine, or as a priority queue.
For those who wish to become more familiar with Erlang,
http://www.erlang.org/download/erlang-book-part1.pdf
(Concurrent Programming in Erlang) provides a
very complete introduction. In particular, the candygram
module implements all of the
functions described in Chapter
5 and sections 7.2, 7.3, and 7.5 of that book.
This recipe offers a very elementary demonstration of how
messages are passed between threads using candygram
. When you run this recipe as a
script, the print
statements will
output the values 2
and then
3
.
It’s important to understand how the candygram.Receiver
class works. The addHandler
method requires at least two
parameters: the first is a message pattern and the second is a handler
function. The Receiver.receive
method invokes a registered handler function, and returns that
function’s result, whenever it finds a message that matches the
associated pattern. Any parameters optionally passed to addHandler
beyond the first two get passed
as parameters to the handler function when the Receiver
calls it. If a parameter is the
candygram.Message
constant, then
receive
replaces that parameter
with the matching message when it calls the handler function.
This recipe’s code contains four different message patterns:
'increment
', (cg.Process
, 'value')
, 'stop
', and (counter, int)
. The 'increment
' and 'stop
' patterns are simple patterns that
match any message that consists solely of the strings 'increment
' and 'stop
', respectively. The (cg.Process, 'value')
pattern matches any
message that is a tuple with two items, where the first item isinstance
of cg.Process
and the second item is the string
value
. Lastly, the (counter, int)
pattern matches any message
that is a tuple with two items where the first item is the
counter
object and the second element is an
integer.
You can find more information about the Candygram package at
http://candygram.sourceforge.net. At that URL,
you can find all details on how to specify message patterns, how to
set a timeout for the Receiver.receive
method, and how to monitor
the running status of spawned threads.
Concurrent Programming in Erlang at
http://www.erlang.org/download/erlang-book-part1.pdf;
the candygram
home page at
http://candygram.sourceforge.net.
Credit: John E. Barham, Sami Hangaslammi, Anthony Baxter
Thread-specific storage is a useful design pattern, and Python
2.3 did not yet support it directly. However, even in 2.3, we could
code it up in terms of a dictionary protected by a lock. For once,
it’s slightly more general, and not significantly harder, to program
to the lower-level thread
module,
rather than to the more commonly useful, higher-level threading
module that Python offers on top
of it:
_tss = { } try: import thread except ImportError: # We're running on a single-threaded platform (or, at least, the Python # interpreter has not been compiled to support threads), so we just return # the same dict for every call -- there's only one thread around anyway! def get_thread_storage( ): return _tss else: # We do have threads; so, to work: _tss_lock = thread.allocate_lock( ) def get_thread_storage( ): """ Return a thread-specific storage dictionary. """ thread_id = thread.get_ident( ) _tss_lock.acquire( ) try: return _tss.set_default(thread_id, { }) finally: _tss_lock.release( )
Python 2.4 offers a much simpler and faster implementation, of
course, thanks to the new threading.local
function:
try: import threading except ImportError: import dummy_threading as threading _tss = threading.local( ) def get_thread_storage( ): return _tss._ _dict_ _
The main benefit of multithreaded programs is that all
of the threads can share global objects when they need to do so.
Often, however, each thread also needs some storage of its own—for
example, to store a network or database connection unique to itself.
Indeed, each such externally oriented object is generally best kept
under the control of a single thread, to avoid multiple possibilities
of highly peculiar behavior, race conditions, and so on. The
get_thread_storage
function in this recipe solves
this problem by implementing the “thread-specific storage” design
pattern, and specifically by returning a thread-specific storage
dictionary. The calling thread can then use the returned dictionary to
store any kind of data that is private to the thread. This recipe is,
in a sense, a generalization of the get_transaction
function from ZODB, the
object-oriented database underlying Zope.
One possible extension to this recipe is to add a
delete_thread_storage
function. Such a function would
be useful, particularly if a way could be found to automate its being
called upon thread termination. Python’s threading architecture does
not make this task particularly easy. You could spawn a watcher thread
to do the deletion after a join with the calling thread, but that’s a
rather heavyweight approach. The recipe as presented, without
deletion, is quite appropriate for the common and recommended
architecture in which you have a pool of (typically daemonic) worker
threads (perhaps some of them general workers, with others dedicated
to interfacing to specific external resources) that are spawned at the
start of the program and do not go away until the end of the whole
process.
When multithreading is involved, implementation must
always be particularly careful to detect and prevent race conditions,
deadlocks, and other such conflicts. In this recipe, I have decided
not to assume that a dictionary’s set_default
method is
atomic (meaning that no thread switch can occur
while set_default
executes)—adding
a key can potentially change the dictionary’s whole structure, after
all. If I was willing to make such an assumption, I could do away with
the lock and vastly increase performance, but I suspect that such an
assumption might make the code too fragile and dependent on specific
versions of Python. (It seems to me that the assumption holds for
Python 2.3, but, even if that is the case, I want my applications to
survive subtle future changes to Python’s internals.) Another risk is
that, if a thread terminates and a new one starts, the new thread
might end up with the same thread ID as the just-terminated one, and
therefore accidentally share the “thread-specific storage” dictionary
left behind by the just-terminated thread. This risk might be
mitigated (though not eliminated) by providing the
delete_thread_storage
function mentioned in the
previous paragraph. Again, this specific problem does not apply to me,
given the kind of multithreading architecture that I use in my
applications. If your architecture differs, you may want to modify
this recipe’s solution accordingly.
If the performance of this recipe’s version is insufficient for your application’s needs, due to excessive overhead in acquiring and releasing the lock, then, rather than just removing the lock at the risk of making your application fragile, you might consider an alternative:
_creating_threads = True _tss_lock = thread.allocate_lock( ) _tss = { } class TssSequencingError(RuntimeError): pass def done_creating_threads( ): """ switch from thread-creation to no-more-threads-created state """ global _creating_threads if not _creating_threads: raise TssSequencingError('done_creating_threads called twice') _creating_threads = False def get_thread_storage( ): """ Return a thread-specific storage dictionary. """ thread_id = thread.get_ident( ) # fast approach if thread-creation phase is finished if not _creating_threads: return _tss[thread_id] # careful approach if we're still creating threads try: _tss_lock.acquire( ) return _tss.setdefault(thread_id, { }) finally: _tss_lock.release( )
This variant adds a boolean switch
_creating_threads
, initially True
. As long as the switch is True
, the variant uses a careful
locking-based approach, quite similar to the one presented in this
recipe’s Solution. At some point in time, when all threads that will
ever exist (or at least all that will ever require access to
get_thread_storage
) have been started, and each of
them has obtained its thread-local storage dictionary, your
application calls done_creating_threads
. This sets
_creating_threads
to False
, and every future call to
get_thread_storage
then takes a fast path where it
simply indexes into global dictionary _tss
—no more
acquiring and releasing the lock, no more creating a thread’s
thread-local storage dictionary if it didn’t yet exist.
As long as your application can determine a moment in which it
can truthfully call done_creating_threads
, the
variant in this subsection should definitely afford a substantial
increase in speed compared to this recipe’s Solution. Note that it is
particularly likely that you can use this variant if your application
follows the popular and recommended architecture mentioned previously:
a bounded set of daemonic, long-lived worker threads, all created
early in your program. This is fortunate, because, if your application
is performance-sensitive enough to worry about the locking overhead of
this recipe’s solution, then no doubt you will want to structure your
application that way. The alternative approach of having many
short-lived threads is generally quite damaging to performance.
If your application needs to run only under Python 2.4, you can
get a much simpler, faster, and solid implementation by relying on the
new threading.local
function.
threading.local
returns a new
object on which any thread can get and set arbitrary attributes,
independently from whatever getting and setting other threads may be
doing on the same object. This recipe, in the 2.4 variant, returns the
per-thread _ _dict_ _
of such an
object, for uniformity with the 2.3 variant. This way, your
applications can be made to run on both Python 2.3 and 2.4, using the
best version in each case:
import sys if sys.version >= '2.4': # insert 2.4 definition of get_local_storage here else: # insert 2.3 definition of get_local_storage here
The 2.4 variant of this recipe also shows off the intended use
of module dummy_threading
, which,
like its sibling dummy_thread
, is
also available in Python 2.3. By conditionally using these dummy
modules, which are available on all platforms, whether or not Python
was compiled with thread support, you may sometimes, with due care, be
able to write applications that can run on any platform, taking
advantage of threading where it’s available but running anyway even
where threading is not available. In the 2.3 variant, we did not use
the similar approach based on dummy_thread
, because the overhead would be
too high to pay on nonthreaded platforms; in the 2.4 variant, overhead
is pretty low anyway, so we went for the simplicity that dummy_threading
affords.
For an exhaustive treatment of the design pattern that describes
thread-specific storage (albeit aimed at C++ programmers), see Douglas
Schmidt, Timothy Harrisson, Nat Pryce, Thread-Specific
Storage: An Object Behavioral Pattern for Efficiently Accessing
per-Thread State (http://www.cs.wustl.edu/~schmidt/PDF/TSS-pattern.pdf);
the Library Reference documentation dummy_thread
, dummy_threading
, and Python 2.4’s threading.local
; ZODB at http://zope.org/Wikis/ZODB/FrontPage.
Credit: Brian Bush, Troy Melhase, David Beach, Martin Miller
You have a task that seems suited to multithreading, but you don’t want to incur the overhead that real thread-switching would impose.
Generators were designed to simplify iteration, but they’re also quite suitable as a basis for cooperative multitasking, also known as microthreading:
import signal # credit: original idea was based on an article by David Mertz # http://gnosis.cx/publish/programming/charming_python_b7.txt # some example 'microthread' generators def empty(name): """ This is an empty task for demonstration purposes. """ while True: print "<empty process>", name yield None def terminating(name, maxn): """ This is a counting task for demonstration purposes. """ for i in xrange(maxn): print "Here %s, %s out of %s" % (name, i, maxn) yield None print "Done with %s, bailing out after %s times" % (name, maxn) def delay(duration=0.8): """ Do nothing at all for 'duration' seconds. """ import time while True: print "<sleep %d>" % duration time.sleep(duration) yield None class GenericScheduler(object): def _ _init_ _(self, threads, stop_asap=False): signal.signal(signal.SIGINT, self.shutdownHandler) self.shutdownRequest = False self.threads = threads self.stop_asap = stop_asap def shutdownHandler(self, n, frame): """ Initiate a request to shutdown cleanly on SIGINT.""" print "Request to shut down." self.shutdownRequest = True def schedule(self): def noop( ): while True: yield None n = len(self.threads) while True: for i, thread in enumerate(self.threads): try: thread.next( ) except StopIteration: if self.stop_asap: return n -= 1 if n==0: return self.threads[i] = noop( ) if self.shutdownRequest: return if _ _name_ _== "_ _main_ _": s = GenericScheduler([ empty('boo'), delay( ), empty('foo'), terminating('fie', 5), delay(0.5), ], stop_asap=True) s.schedule( ) s = GenericScheduler([ empty('boo'), delay( ), empty('foo'), terminating('fie', 5), delay(0.5), ], stop_asap=False) s.schedule( )
Microthreading (or cooperative multitasking) is an important technique. If you want to pursue it in earnest for complex uses, you should definitely look up the possibilities of Christian Tismer’s Stackless, a Python version specialized for microthreading, at http://www.stackless.com/. However, you can get a taste of cooperative multitasking without straying from Python’s core, by making creative use of generators, as shown in this recipe.
A simple approach to cooperative multitasking, such as the one
presented in this recipe, is not suitable when
your tasks must perform long-running work, particularly I/O tasks that
may involve blocking system calls. For such applications, look into
real threading, or, as a strong alternative, look into the
event-driven approach offered by module asyncore
in the Python Standard Library (on
a simple scale) and by package Twisted at http://twistedmatrix.com/products/twisted (on
a grandiose scale). But if your application has modest I/O needs, and
you can slice up any computation your tasks perform into short chunks,
each of which you can end with a yield
, this recipe may be just what you’re
looking for.
David Mertz’s site, chock-full of idiosyncratic, fascinating ideas, is at http://gnosis.cx/; Christian Tismer’s Stackless Python, the best way to do cooperative multitasking in Python (and much else besides), is at http://www.stackless.com/; Twisted Matrix, the best way to do event-driven (asynchronous) programming, is at http://twistedmatrix.com/.
Credit: Bill Bell
In a Windows environment, you want to ensure that only one instance of a script is running at any given time.
Many tricks can be used to avoid starting multiple copies of an
application, but they’re all quite fragile—except those based on a
mutual-exclusion (mutex
) kernel
object, such as this one. Mark Hammond’s precious PyWin32
package supplies all the needed
hooks into the Windows APIs to let us exploit a mutex
for this purpose:
from win32event import CreateMutex from win32api import GetLastError from winerror import ERROR_ALREADY_EXISTS from sys import exit handle = CreateMutex(None, 1, 'A unique mutex name') if GetLastError( ) == ERROR_ALREADY_EXISTS: # Take appropriate action, as this is the second # instance of this script; for example: print 'Oh! dear, I exist already.' exit(1) else: # This is the only instance of the script; let # it do its normal work. For example: from time import sleep for i in range(10): print "I'm running",i sleep(1)
The string 'A unique mutex
name
' must be chosen to be unique to this script, and it
must not be dynamically generated, because the string must have the
same value for all potential simultaneous instances of the same
script. A fresh, globally unique ID that you manually generate and
insert at script-authoring time would be a good choice. According to
the Windows documentation, the string can contain any characters
except backslashes (). On Windows
platforms that implement Terminal Services, you can optionally prefix
the string with
Global
or Local
, but such prefixes would make the
string invalid for most versions of Windows, including NT, 95, 98, and
ME.
The Win32 API call CreateMutex
creates a Windows kernel object
of the mutual-exclusion (mutex
)
kind and returns a handle to it. Note that we do
not close this handle, because it needs to exist
throughout the time this process is running. It’s important to let the
Windows kernel take care of removing the handle (and the object it
indicates, if the handle being removed is the only handle to that
kernel object) when our process terminates.
The only thing we really care about is the return code from the
API call, which we obtain by calling the GetLastError
API right after it. That code
is ERROR_ALREADY_EXISTS
if and only
if the mutual-exclusion object we tried to create already exists
(i.e., if another instance of this script is already running).
This approach is perfectly safe and not subject to race conditions and similar anomalies, even if two instances of the script are trying to start at the same time (a reasonably frequent occurrence, e.g., if the user erroneously double-clicks in an Active Desktop setting where a single click already starts the application). The Windows specifications guarantee that only one of the instances will create the mutex, while the other will be informed that the mutex already exists. Mutual exclusion is therefore guaranteed by the Windows kernel itself, and the recipe is as solid as the operating system.
Documentation for the Win32 API in PyWin32
(http://starship.python.net/crew/mhammond/win32/Downloads.html)
or ActivePython (http://www.activestate.com/ActivePython/);
Windows API documentation available from Microsoft (http://msdn.microsoft.com);
Python Programming on Win32, by Mark Hammond and
Andy Robinson (O’Reilly).
Credit: Michael Robin
In a Win32 application, you need to process messages, but you also want to wait for kernel-level waitable objects, and coordinate several activities.
A Windows application’s message loop, also known as its message pump, is at the heart of Windows. It’s worth some effort to ensure that the heart beats properly and regularly:
import win32event import pythoncom TIMEOUT = 200 # ms StopEvent = win32event.CreateEvent(None, 0, 0, None) OtherEvent = win32event.CreateEvent(None, 0, 0, None) class myCoolApp(object): def OnQuit(self): # assume 'areYouSure' is a global function that makes a final # check via a message box, a fancy dialog, or whatever else! if areYouSure( ): win32event.SetEvent(StopEvent) # Exit msg pump def _MessagePump( ): waitables = StopEvent, OtherEvent while True:rc = win32event.MsgWaitForMultipleObjects( waitables, , # Wait for all = false, so it waits for any one TIMEOUT, # (or win32event.INFINITE) win32event.QS_ALLEVENTS) # Accept all kinds of events # You can call a function here, if it doesn't take too long. It will # be executed at least every TIMEOUT ms -- possibly a lot more often, # depending on the number of Windows messages received. if rc == win32event.WAIT_OBJECT_0: # Our first event listed, the StopEvent, was triggered, so # we must exit, terminating the message pump break elif rc == win32event.WAIT_OBJECT_0+1: # Our second event listed, "OtherEvent", was set. Do # whatever needs to be done -- you can wait on as many # kernel-waitable objects as needed (events, locks, # processes, threads, notifications, and so on). pass elif rc == win32event.WAIT_OBJECT_0+len(waitables): # A windows message is waiting - take care of it. (Don't # ask me why a WAIT_OBJECT_MSG isn't defined < # WAIT_OBJECT_0...!). # This message-serving MUST be done for COM, DDE, and other # Windows-y things to work properly! if pythoncom.PumpWaitingMessages( ): break # we received a wm_quit message elif rc == win32event.WAIT_TIMEOUT: # Our timeout has elapsed. # Do some work here (e.g, poll something you can't thread) # or just feel good to be alive. pass else: raise RuntimeError("unexpected win32wait return value")
Most Win32 applications must process messages, but you often want to wait on kernel waitables and coordinate a lot of things going on at the same time. A good message pump structure is the key to this, and this recipe exemplifies a reasonably simple but pretty effective one.
With the message pump shown in this recipe, messages and other events get dispatched as soon as they are posted, and a timeout allows you to poll other components. You may need to poll if the proper calls or event objects are not exposed in your Win32 event loop, as many components insist on running only on the application’s main thread and cannot run on spawned (secondary) threads.
You can add many other refinements, just as you can to any other
Win32 message-pump approach. Python lets you do this with as much
precision as C does, thanks to Mark Hammond’s PyWin32
package (which used to be known as
win32all
). However, the relatively
simple message pump presented in this recipe is already a big step up
from the typical naive application that can either serve its message
loop or wait on kernel waitables, but not both.
The key to this recipe is the Windows API call MsgWaitForMultipleObjects
, which takes
several parameters. The first is a tuple of kernel objects you want to
wait for. The second parameter is a flag that is normally 0. The value
1 indicates that you should wait until all the
kernel objects in the first parameter are signaled, but my experience
suggests that you almost invariably want to stop waiting when any
one of these objects is signaled, so this
parameter will almost always be 0. The third is a flag that specifies
which Windows messages you want to interrupt the wait; always pass
win32event.QS_ALLEVENTS
here, to
make sure any Windows message interrupts the wait. The fourth
parameter is a timeout period (in milliseconds), or win32event.INFINITE
if you are sure you do
not need to do any periodic polling.
This function is a polling loop and, sure enough, it loops (with
a while True
, which is terminated
only by a break
within it). At each
leg of the loop, it calls the API that waits for multiple objects.
When that API stops waiting, it returns a code that explains why it
stopped waiting. A value between win32event.WAIT_OBJECT_0
and win32event.WAIT_OBJECT_0+N-1
(where
N
is the number of waitable kernel objects
in the tuple you passed as the first parameter), inclusive, means that
the wait finished because an object was signaled (being signaled means
different things for each kind of waitable kernel object). The return
code’s difference from win32event.WAIT_OBJECT_0
is the index of the
relevant object in the tuple.
A return value of win32event.WAIT_OBJECT_0+N
means that the
wait finished because a message was pending, and in this case, our
recipe processes all pending Windows messages via a call to pythoncom.PumpWaitingMessages
. (That
function, in turn, returns a true result if a WM_QUIT
message was received, so in this
case, we break
out of the whole
while
loop.) A code of win32event.WAIT_TIMEOUT
means the wait
finished because of a timeout, so we can do our polling there. In this
case, no message is waiting, and none of our kernel objects of
interest were signaled.
Basically, the way to tune this recipe for yourself is by using the right kernel objects as waitables (with an appropriate response to each) and by doing whatever you need to do periodically in the polling case. While this means you must have some detailed understanding of Win32, of course, it’s still quite a bit easier than designing your own special-purpose, message-loop function from scratch.
I suspect that a purist would find some way or other to wrap all of this message pumping into a neat module, letting each application customize its use of the module by passing in a list of waitables, some dictionary to map different waitables to chunks of code to execute, and a partridge in a pear tree. Go ahead, turn it all into a custom metaclass if you wish, see if I care. For once, though, I think the right approach to reusing this code is to copy it into your application’s source directories, and use your trusty text editor (gasp!) to tailor the message pump to your application’s exact needs.
Documentation for the Win32 API in PyWin32
(http://starship.python.net/crew/mhammond/win32/Downloads.html)
or ActivePython (http://www.activestate.com/ActivePython/);
Windows API documentation available from Microsoft (http://msdn.microsoft.com);
Mark Hammond and Andy Robinson, Python Programming on
Win32 (O’Reilly).
Credit: Sébastien Keim, Tino Lange, Noah Spurrier
You want to drive an external process that accepts commands from its standard input, and you don’t care about the responses (if any) that the external process may emit on its standard output.
If you need to drive only the other process’ input and don’t
care about its output, the simple os.popen
function is enough. For example,
here is a way to do animated graphics by driving the free program
gnuplot via os.popen
:
import os f = os.popen('gnuplot', 'w') print >>f, "set yrange[-300:+300]" for n in range(300): print >>f, "plot %i*cos(x)+%i*log(x+10)" % (n, 150-n) f.flush( ) f.close( )
When you want to use Python as a glue language, sometimes (in
particularly easy cases) the simple function popen
(from the standard library module
os
) may be all you need.
Specifically, os.popen
may suffice
when you need to drive an external program that accepts commands on
its standard input, as long as you can ignore any response that the
program might be making on its standard output (and also error
messages that the program might be sending to its standard error). A
good example is given by the free plotting program gnuplot. (os.popen
may also suffice when you need to
obtain the output from a program that does not need to read its
standard input.)
The statement f = os.popen('gnuplot',
'w')
creates a file-like object connected to the standard
input of the program it launches, namely 'gnuplot
‘. (To try this recipe, you have to
have gnuplot installed on your
PATH
, but since gnuplot is freely available and widely
ported software, that should not be a problem!) Whatever we write to
f
, the external process receives on its
standard input, just as would happen if we used that same program
interactively. For more of the same, check out http://sourceforge.net/projects/gnuplot-py/:
it’s a rich and interesting Python interface to gnuplot implemented entirely on the basis of
the simple idea shown in this recipe!
When your needs are more sophisticated than os.popen
can accommodate, you may want to
look at os.popen2
and other such
higher-numbered functions in module os
, or, in Python 2.4, the new standard
library module subprocess
. However,
in many cases, you’re likely to be disappointed: as soon as you get
beyond the basics, driving (from your own programs) other external
programs that were designed to be used interactively can become more
than a little frustrating. Fortunately, a solution is at hand: it’s
pexpect
, a third-party Python
module that you can find at http://pexpect.sourceforge.net/. pexpect
is designed specifically for the
task of driving other programs, and it lets you check on the other
program’s responses as well as sending commands to the other program’s
standard input. Still, while pexpect
will most definitely offer you all
the power you need, os.popen
will
probably suffice when you don’t need anything fancy!
Module os
(specifically
os.popen
) in the Library
Reference and Python in a Nutshell;
gnuplot is at http://www.gnuplot.info/;
gnuplot.py is at http://sourceforge.net/projects/gnuplot-py/;
pexpect
is at http://pexpect.sourceforge.net/.
Credit: Brent Burley, Bradey Honsinger, Tobias Polzin, Jonathan Cano, Padraig Brady
You need to run an external process in a Unix-like environment and capture both the output and error streams from that external process.
The popen2
module lets you
capture both streams, but you also need help from module fcntl
, to make the streams nonblocking and
thus avoid deadlocks, and from module select
, to orchestrate the action:
import os, popen2, fcntl, select def makeNonBlocking(fd): fl = fcntl.fcntl(fd, os.F_GETFL) try: fcntl.fcntl(fd, os.F_SETFL, fl | os.O_NDELAY) except AttributeError: fcntl.fcntl(fd, os.F_SETFL, fl | os.FNDELAY) def getCommandOutput(command): child = popen2.Popen3(command, 1) # Capture stdout and stderr from command child.tochild.close( ) # don't need to write to child's stdin outfile = child.fromchild outfd = outfile.fileno( ) errfile = child.childerr errfd = errfile.fileno( ) makeNonBlocking(outfd) # Don't deadlock! Make fd's nonblocking. makeNonBlocking(errfd) outdata, errdata = [ ], [ ] outeof = erreof = False while True: to_check = [outfd]*(not outeof) + [errfd]*(not erreof) ready = select.select(to_check, [ ], [ ]) # Wait for input if outfd in ready[0]: outchunk = outfile.read( ) if outchunk == '': outeof = True else: outdata.append(outchunk) if errfd in ready[0]: errchunk = errfile.read( ) if errchunk == '': erreof = True else: errdata.append(errchunk) if outeof and erreof: break select.select([ ],[ ],[ ],.1) # Allow a little time for buffers to fill err = child.wait( ) if err != 0: raise RuntimeError, '%r failed with exit code %d %s' % ( command, err, ''.join(errdata)) return ''.join(outdata) def getCommandOutput2(command): child = os.popen(command) data = child.read( ) err = child.close( ) if err: raise RuntimeError, '%r failed with exit code %d' % (command, err)
This recipe shows how to execute a Unix shell command and
capture the output and error streams in Python. By contrast, os.system
sends both streams directly to the
terminal. The function getCommandOutput
presented in
this recipe executes a command and returns the command’s output. If
the command fails, getCommandOutput
raises an
exception, using the text captured from the command’s stderr
as part of the exception’s
arguments.
Most of the complexity of this code is due to the difficulty of
capturing both the output and error streams of the child process
independently and at the same time. Normal (blocking) read calls may
deadlock if the child is trying to write to one stream, and the parent
is waiting for data on the other stream; so, the streams must be set
to nonblocking, and select
must be
used to wait for data on either of the streams.
Note that the second select
call is included just to add a 0.1-second sleep after each read.
Counter intuitively, this allows the code to run much faster, since it
gives the child time to put more data in the buffer. Without it, the
parent may try to read only a few bytes at a time, which can be very
expensive. Calling time.sleep(0.1)
should be exactly equivalent, but since I was already, necessarily,
calling select.select
elsewhere in
the recipe’s code, I decided not to also import module time
needlessly.
If you want to capture only the output and don’t mind the error
stream going to the terminal, you can use the much simpler code
presented in getCommandOutput2
. If you want to
suppress the error stream altogether, that’s easy, too—just append
2>/dev/null
to the command. For
example:
listing = getCommandOutput2('ls -1 2>/dev/null')
Another possibility is given by the os.popen4
function, which combines the
output and error streams of the child process. However, in that case
the streams are combined in a potentially messy way, depending on how
they are buffered in the child process, so this recipe can
help.
In Python 2.4, you can use class Popen
, instead of popen2.Popen3
, from the new standard library
module subprocess
. However, the
issues highlighted in this recipe (namely, the need to use modules
fcntl
and select
to make files nonblocking and
coordinate the loop that interacts with the child process) aren’t
really affected by whether you use popen2
or subprocess
.
This recipe does, as advertised, require a
rather Unix-like underlying platform. Cygwin, which does a generally
great job of emulating Unix on top of Windows, is not sufficient; for
example, it offers no way to set files to nonblocking mode, nor to
select
on general files. (Under
Windows, you are allowed to select
only on sockets, not on other files.) If you must run on such
problematic, non-Unix platforms, you may prefer a very different
approach, based on using temporary files:
import os, tempfile def getCommandOutput(command): outfile = tempfile.mktemp( ) errfile = tempfile.mktemp( ) cmd = "( %s ) > %s 2> %s" % (command, outfile, errfile) err = os.system(cmd) >> 8 try: if err != 0: raise RuntimeError, '%r failed with exit code %d %s' % ( command, err, file(errfile).read( )) return file(outfile).read( ) finally: os.remove(outfile) os.remove(errfile)
Documentation of the standard library modules os
, popen2
, fcntl
, select
, and tempfile
in the Library
Reference and Python in a Nutshell;
(Python 2.4 only) module subprocess
in the Library Reference.
Credit: Jürgen Hermann, Andy Gimblett, Josh Hoyt, Noah Spurrier, Jonathan Bartlett, Greg Stein
You need to fork a daemon process on a Unix or Unix-like system, which, in turn, requires a certain precise sequence of system calls.
Unix daemon processes must detach from their controlling terminal and process group. Doing so is not hard, but it does require some care, so it’s worth writing a daemonize.py module once and for all:
import sys, os ''' Module to fork the current process as a daemon. NOTE: don't do any of this if your daemon gets started by inetd! inetd does all you need, including redirecting standard file descriptors; the chdir( ) and umask( ) steps are the only ones you may still want. ''' def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): ''' Fork the current process as a daemon, redirecting standard file descriptors (by default, redirects them to /dev/null). ''' # Perform first fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit first parent. except OSError, e: sys.stderr.write("fork #1 failed: (%d) %s " % (e.errno, e.strerror)) sys.exit(1) # Decouple from parent environment. os.chdir("/") os.umask(0) os.setsid( ) # Perform second fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit second parent. except OSError, e: sys.stderr.write("fork #2 failed: (%d) %s " % (e.errno, e.strerror)) sys.exit(1) # The process is now daemonized, redirect standard file descriptors. for f in sys.stdout, sys.stderr: f.flush( ) si = file(stdin, 'r') so = file(stdout, 'a+') se = file(stderr, 'a+', 0) os.dup2(si.fileno( ), sys.stdin.fileno( )) os.dup2(so.fileno( ), sys.stdout.fileno( )) os.dup2(se.fileno( ), sys.stderr.fileno( )) def _example_main ( ): ''' Example main function: print a count & timestamp each second ''' import time sys.stdout.write('Daemon started with pid %d ' % os.getpid( ) ) sys.stdout.write('Daemon stdout output ') sys.stderr.write('Daemon stderr output ') c = 0 while True: sys.stdout.write('%d: %s ' % (c, time.ctime( ))) sys.stdout.flush( ) c = c + 1 time.sleep(1) if _ _name_ _ == "_ _main_ _": daemonize('/dev/null','/tmp/daemon.log','/tmp/daemon.log') _example_main( )
Forking a daemon on Unix requires a certain specific sequence of
system calls, which is explained in W. Richard Stevens’ seminal book,
Advanced Programming in the Unix Environment
(Addison-Wesley). We need to fork
twice, terminating each parent process and letting only the grandchild
of the original process run the daemon’s code. This allows us to
decouple the daemon process from the calling terminal, so that the
daemon process can keep running (typically as a server process without
further user interaction, like a web server) even after the calling
terminal is closed. The only visible effect of doing so is that when
your script runs this module’s daemonize
function,
you get your shell prompt back immediately.
For all of the details about how and why this works in Unix and Unix-like systems, see Stevens’ wonderful book. Another important source of information on both practical and theoretical issues about “daemon forking” can be found as part of the Unix Programming FAQ, at http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16.
To summarize: the first fork
lets the shell return, and also lets you do a setsid
(to remove you from your controlling
terminal, so you can’t accidentally be sent a signal). However,
setsid
makes this process a
“session leader”, which means that if the process ever opens any
terminal, it will become the process’ controlling terminal. We do not
want a daemon to have any controlling terminal,
which is why we fork again. After the second fork
, the process is no longer a “session
leader”, so it can open any file (including a terminal) without
thereby accidentally reacquiring a controlling terminal.
Both Stevens and the Unix Programming FAQ provide examples in
the C programming language, but since the Python Standard Library
exposes a full POSIX interface, you can also do it all in Python.
Typical C code for a daemon fork translates almost literally to
Python; the only difference you have to care about—a minor detail—is
that Python’s os.fork
does not
return -1 on errors, but rather throws an OSError
exception. Therefore, rather than
testing for a less-than-zero return code from fork
, as we would in C, we run the fork
in the try
clause of a try
/except
statement, so that we can catch the
exception, should it happen, and print appropriate diagnostics to
standard error.
Documentation of the standard library module os
in the Library
Reference and Python in a Nutshell;
Unix manpages for the fork
,
umask
, and setsid
system calls; W.Richard Stevens,
Advanced Programming in the Unix Environment
(Addison-Wesley); also, the Unix Programming FAQ on daemon forking, at
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16.