14
The Multiprocessing, Threading, and Concurrent.Futures Modules

When we eliminate the complexities of shared state and design around pure functions with non-strict processing, we can leverage concurrency and parallelism to improve performance. In this chapter, we’ll look at some of the multiprocessing and multithreading techniques that are available to us. Python library packages become particularly helpful when applied to algorithms designed from a functional viewpoint.

The central idea here is to distribute a functional program across several threads within a process or across several processes in a CPU. If we’ve created a sensible functional design, we can avoid complex interactions among application components; we have functions that accept argument values and produce results. This is an ideal structure for a process or a thread.

In this chapter, we’ll focus on several topics:

  • The general idea of functional programming and concurrency.

  • What concurrency really means when we consider cores, CPUs, and OS-level concurrency and parallelism. It’s important to note that concurrency won’t magically make a bad algorithm faster.

  • Using the built-in multiprocessing and concurrent.futures modules. These modules allow a number of parallel execution techniques. The external dask package can do much of this as well.

We’ll focus on process-level parallelism more than multithreading. Using process parallelism allows us to completely ignore Python’s Global Interpreter Lock (GIL).

For more information on Python’s GIL, see https://docs.python.org/3/glossary.html#term-global-interpreter-lock. Also see https://peps.python.org/pep-0684/ for a proposal to alter the way the GIL operates. Additionally, see https://github.com/colesbury/nogil for a project that proposes a way to remove the GIL entirely.

The GIL is very much part of Python 3.10, meaning some kinds of compute-intensive multithreading won’t show significant speedups.

We’ll focus on concurrency throughout the chapter. Concurrent work is interleaved, distinct from parallel work, which requires multiple cores or multiple processors. We don’t want to dig too deeply into the nuanced distinctions between concurrency and parallelism. Our focus is on leveraging a functional approach, more than exploring all of the ways work can be accomplished in a modern CPU with a multi-processing OS.

14.1 Functional programming and concurrency

The most effective concurrent processing occurs when there are no dependencies among the tasks being performed. The biggest difficulty in developing concurrent (or parallel) programming is the complications arising from coordinating updates to shared resources, where tasks depend on a common resource.

When following functional design patterns, we tend to avoid stateful programs. A functional design should minimize or eliminate concurrent updates to shared objects. If we can design software where lazy, non-strict evaluation is central, we can also design software where concurrent evaluation is helpful. In some cases, parts of an application can have an embarrassingly parallel design, where most of the work can be done concurrently with few or no interactions among computations. Mappings and filterings, in particular, benefit from parallel processing; reductions typically can’t be done in parallel.

The frameworks we’ll focus on all make use of an essential map() function to allocate work to multiple workers in a pool. This fits nicely with the higher-order functional design we’ve been looking at throughout this book. If we’ve built our application with a particular focus on the map() function, then partitioning the work into processes or threads should not involve a breaking change.

14.2 What concurrency really means

In a small computer, with a single processor and a single core, all evaluations are serialized through the one and only core of the processor. The OS will interleave multiple processes and multiple threads through clever time-slicing arrangements to make it appear as if things are happening concurrently.

On a computer with multiple CPUs or multiple cores in a single CPU, there can be some actual parallel processing of CPU instructions. All other concurrency is simulated through time slicing at the OS level. A macOS X laptop can have 200 concurrent processes that share the CPU; this is many more processes than the number of available cores. From this, we can see that OS time slicing is responsible for most of the apparently concurrent behavior of the system as a whole.

14.2.1 The boundary conditions

Let’s consider a hypothetical algorithm that has a complexity described by O(n2). This generally means two nested for statements, each of which is processing n items. Let’s assume the inner for statement’s body involves 1,000 Python operation codes. When processing 10,000 objects, this could execute 100 billion Python operations. We can call this the essential processing budget. We can try to allocate as many processes and threads as we feel might be helpful, but the processing budget can’t change.

The individual CPython bytecodes—the internal implementation of Python statements and expressions—don’t all share a single, uniform execution time. However, a long-term average on a macOS X laptop shows that we can expect about 60 MB of bytecode operations to be executed per second. This means that our 100 billion bytecode operation could take about 1,666 seconds, or 28 minutes.

If we have a dual-processor, four-core computer, then we might cut the elapsed time to 25% of the original total: about 7 minutes. This presumes that we can partition the work into four (or more) independent OS processes.

The important consideration here is that the overall budget of 100 billion bytecodes can’t be changed. Concurrency won’t magically reduce the workload. It can only change the schedule to perhaps reduce the elapsed time to execute all those bytecodes.

Switching to a better algorithm with a complexity of O(nlog n) can reduce the workload dramatically. We need to measure the actual speedup to determine the impact; the following example includes a number of assumptions. Instead of doing 10,0002 iterations, we may only do 10,000log 10,000 132,877 iterations, dropping from 100 billion operations to a number on the order of 133 thousand operations. This could be as small as 7100- of the original time. Concurrency can’t provide the kind of dramatic improvements that algorithm change will have.

14.2.2 Sharing resources with process or threads

The OS assures us there is little or no interaction between processes. When creating an application where multiple processes must interact, a common OS resource must be explicitly shared. This can be a common file, a shared-memory object, or a semaphore with a shared state between the processes. Processes are inherently independent; interaction among them is the exception, not the rule.

Multiple threads, in contrast, are part of a single process; all threads of a process generally share resources, with one special case. Thread-local memory can be freely used without interference from other threads. Outside thread-local memory, operations that write to memory can set the internal state of the process in a potentially unpredictable order. One thread can overwrite the results of another thread. A technique for mutually exclusive access—often a form of locking—must be used to avoid problems. As noted previously, the overall sequence of instruction from concurrent threads and processes are generally interleaved among the cores in an unpredictable order. With this concurrency comes the possibility of destructive updates to shared variables and the need for mutually exclusive access.

The existence of concurrent object updates can create havoc when trying to design multithreaded applications. Locking is one way to avoid concurrent writes to shared objects. Avoiding shared objects in general is another viable design technique. The second technique—avoiding writes to shared objects—is often also applicable to functional programming.

In CPython, the GIL is used to ensure that OS thread scheduling will not interfere with the internals of maintaining Python data structures. In effect, the GIL changes the granularity of scheduling from machine instructions to groups of Python virtual machine operations.

Pragmatically, the performance impact of the GIL on a wide variety of application types is often negligible. For the most part, compute-intensive applications tend to see the largest impact from GIL scheduling. I/O-intensive applications see little impact because the threads spend more time waiting for I/O to complete. A far greater impact on performance comes from the fundamental inherent complexity of the algorithm being implemented.

14.2.3 Where benefits will accrue

A program that does a great deal of calculation and relatively little I/O will not see much benefit from concurrent processing on a single core. If a calculation has a budget of 28 minutes of computation, then interleaving the operations in different ways won’t have a dramatic impact. Using eight cores for parallel computation may cut the time by approximately one-eighth. The actual time savings depend on the OS and language overheads, which are difficult to predict.

When a calculation involves a great deal of I/O, then interleaving CPU processing while waiting for I/O requests to complete can dramatically improve performance. The idea is to do computations on some pieces of data while waiting for the OS to complete the I/O of other pieces of data. Because I/O generally involves a great deal of waiting, an eight-core processor can interleave the work from dozens (or hundreds) of concurrent I/O requests.

Concurrency is a core principle behind Linux. If we couldn’t do computation while waiting for I/O, then our computer would freeze while waiting for each network request to finish. A website download would involve waiting for the initial HTML and then waiting for each individual graphic to arrive. All the while, the keyboard, mouse, and display would not work.

Here are two approaches to designing applications that interleave computation and I/O:

  • We can create a pipeline of processing stages. An individual item must move through all of the stages where it is read, filtered, computed, aggregated, and written. The idea of multiple concurrent stages means there will be distinct data objects in each stage. Time slicing among the stages will allow computation and I/O to be interleaved.

  • We can create a pool of concurrent workers, each of which performs all of the processing for a data item. The data items are assigned to workers in the pool and the results are collected from the workers.

The differences between these approaches aren’t crisp. It’s common to create a hybrid mixture where one stage of a pipeline involves a pool of workers to make that stage as fast as the other stages. There are some formalisms that make it somewhat easier to design concurrent programs. The Communicating Sequential Processes (CSP) paradigm can help design message-passing applications. Packages such as pycsp can be used to add CSP formalisms to Python.

I/O-intensive programs often gain the most dramatic benefits from concurrent processing. The idea is to interleave I/O and processing. CPU-intensive programs will see smaller benefits from concurrent processing.

14.3 Using multiprocessing pools and tasks

Python’s multiprocessing package introduces the concept of a Pool object. A Pool object contains a number of worker processes and expects these processes to be executed concurrently. This package allows OS scheduling and time slicing to interleave execution of multiple processes. The intention is to keep the overall system as busy as possible.

To make the most of this capability, we need to decompose our application into components for which non-strict, concurrent execution is beneficial. The overall application must be built from discrete tasks that can be processed in an indefinite order.

An application that gathers data from the internet through web scraping, for example, is often optimized through concurrent processing. A number of individual processes can be waiting for the data to download, while others are performing the scraping operation on data that’s been received. We can create a Pool object of several identical workers, which implement the website scraping. Each worker is assigned tasks in the form of URLs to be analyzed. Multiple workers waiting for downloads have little processing overhead. The workers with completely downloaded pages, on the other hand, can perform the real work of extracting data from the content.

An application that analyzes multiple log files is also a good candidate for concurrent processing. We can create a Pool object of analytical workers. We can assign each log file to a worker; this allows reading and analysis to proceed concurrently among the various workers in the Pool object. Each individual worker will be performing both I/O and computation. However, some workers can be analyzing while other workers are waiting for I/O to complete.

Because the benefits depend on difficult-to-predict timing for input and output operations, multiprocessing always involves experimentation. Changing the pool size and measuring elapsed time is an essential part of implementing concurrent applications.

14.3.1 Processing many large files

Here is an example of a multiprocessing application. We’ll parse Common Log Format (CLF) lines in web log files. This is the generally used format for web server access logs. The lines tend to be long, but look like the following when wrapped to the book’s margins:

99.49.32.197 - - [01/Jun/2012:22:17:54 -0400] "GET /favicon.ico\ 
HTTP/1.1" 200 894 "-" "Mozilla/5.0 (Windows NT 6.0)\ 
AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.52\ 
Safari/536.5"

We often have large numbers of files that we’d like to analyze. The presence of many independent files means that concurrency will have some benefit for our scraping process. Some workers will be waiting for data, while others can be doing the compute-intensive portion of the work.

We’ll decompose the analysis into two broad areas of functionality. The first phase of processing is the essential parsing of the log files to gather the relevant pieces of information. We’ll further decompose the parsing phase into four stages. They are as follows:

  1. All the lines from multiple source log files are read.

  2. Then, we create simple NamedTuple objects from the lines of log entries in a collection of files.

  3. The details of more complex fields such as dates and URLs are parsed separately.

  4. Uninteresting paths from the logs are rejected, leaving the interesting paths for further processing.

Once past the parsing phase, we can perform a large number of analyses. For our purposes in demonstrating the multiprocessing module, we’ll look at a simple analysis to count occurrences of specific paths.

The first portion is reading from the source files. Python’s use of file iterators will translate into lower-level OS requests for the buffering of data. Each OS request means that the process must wait for the data to become available.

Clearly, we want to interleave the other operations so that they are not all waiting for I/O to complete. The operations can be imagined to form a spectrum, from processing individual rows to processing whole files. We’ll look at interleaving whole files first, as this is relatively simple to implement.

The functional design for parsing Apache CLF files can look as follows:

data = path_filter( 
    access_detail_iter( 
        access_iter( 
            local_gzip(filename))))

This function decomposes the larger parsing problem into a number of functions. The local_gzip() function reads rows from locally cached GZIP files. The access_iter() function creates a NamedTuple object for each row in the access log. The access_detail_iter() function expands on some of the more difficult-to-parse fields. Finally, the path_filter() function discards some paths and file extensions that aren’t of much analytical value.

It can help to visualize this kind of design as a shell-like pipeline of processing, as shown here:

(local_gzip(filename) | access_iter 
    | access_detail_iter | path_filter) >data

This uses borrows the shell notation of a pipe (—) to pass data from process to process. Python doesn’t have this operator, directly.

Pragmatically, we can use the toolz module to define this pipeline:

from toolz.functoolz import pipe 
 
data = pipe(filename, 
    local_gzip, 
    access_iter, 
    access_detail_iter, 
    path_filter 
)

For more on the toolz module, see Chapter 11, The Toolz Package.

We’ll focus on designing these four functions that process data in stages. The idea is to interleave intensive processing with waiting for I/O to finish.

14.3.2 Parsing log files – gathering the rows

Here is the first stage in parsing a large number of files: reading each file and producing a simple sequence of lines. As the log files are saved in the .gzip format, we need to open each file with the gzip.open() function.

The following local_gzip() function reads lines from locally cached files:

from collections.abc import Iterator 
import gzip 
from pathlib import Path 
 
import sys 
def local_gzip(zip_path: Path) -> Iterator[str]: 
    with gzip.open(zip_path, "rb") as log_file: 
        yield from ( 
            line.decode(’us-ascii’).rstrip() 
            for line in log_file 
        )

The function iterates through all lines of a file. We’ve created a composite function that encapsulates the details of opening a log file compressed with the .gzip format, breaking a file into a sequence of lines, and stripping the newline ( ) characters.

Additionally, this function also encapsulates a non-standard encoding for the files. Instead of Unicode, encoded in a standard format like UTF-8 or UTF-16, the files are encoded in old US-ASCII. This is very similar to UTF-8. In order to be sure the log entries are read properly, the exact encoding is supplied.

This function is a close fit with the way the multiprocessing module works. We can create a worker pool and map tasks (such as .gzip file reading) to the pool of processes. If we do this, we can read these files in parallel; the open file objects will be part of separate processes, and the resource consumption and wait time will be managed by the OS.

An extension to this design can include a second function to transfer files from the web host using SFTP or a RESTful API if one is available. As the files are collected from the web server, they can be analyzed using the local_gzip() function.

The results of the local_gzip() function are used by the access_iter() function to create named tuples for each row in the source file that describes file access by the web server.

14.3.3 Parsing log lines into named tuples

Once we have access to all of the lines of each log file, we can extract details of the access that’s described. We’ll use a regular expression to decompose the line. From there, we can build a NamedTuple object.

Each individual access can be summarized as a subclass of NamedTuple, as follows:

from typing import NamedTuple, Optional, cast 
import re 
 
class Access(NamedTuple): 
    host: str 
    identity: str 
    user: str 
    time: str 
    request: str 
    status: str 
    bytes: str 
    referer: str 
    user_agent: str 
 
    @classmethod 
    def create(cls: type, line: str) -> Optional["Access"]: 
        format_pat = re.compile( 
            r"(?P<host>[d.]+)s+" 
            r"(?P<identity>S+)s+" 
            r"(?P<user>S+)s+" 
            r"[(?P<time>.+?)]s+" 
            r’"(?P<request>.+?)"s+’ 
            r"(?P<status>d+)s+" 
            r"(?P<bytes>S+)s+" 
            r’"(?P<referer>.*?)"s+’ 
            r’"(?P<user_agent>.+?)"s*’ 
            ) 
        if match := format_pat.match(line): 
            return cast(Access, cls(**match.groupdict())) 
        return None

The method for building an Access object, create(), from the source text contains a lengthy regular expression to parse lines in a CLF file. This is quite complex, but we can use a railroad diagram to help simplify it. The following image shows the various elements and how they’re identified by the regular expression:

hd.hsinisunus[tat]srarssdssbnbsrarsuausoiopdodpsospimnimpeneptaitapyoypenepsnspsgsaeneaeneaeyeaquyquatgtatntafeyfeaeyeatitcn-ncr-rcceecuiuce-ecrrcrrctetistieseessestsessseeeeaaetptpttprrggyayaaeecccnneeett

Figure 14.1: Regular expression diagram for parsing log files

This diagram shows the sequence of clauses in the regular expression. Each rectangular box represents a named capture group. For example, (?P<host>[d.]+) is a group named host. The ovals and circles are classes of characters (e.g., digit) or specific characters (e.g., .) that comprise the contents of the capture group.

We used this regular expression to break each row into a dictionary of nine individual data elements. The use of [] and " to delimit complex fields such as the time, request, referer, and user_agent parameters can be handled elegantly by transforming the text into a NamedTuple object.

We’ve taken pains to ensure that the NamedTuple field names match the regular expression group names in the (?P<name>) constructs for each portion of the record. By making sure the names match, we can very easily transform the parsed dictionary into a tuple for further processing. This means we’ve spelled referrer wrong to fit with the RFC documentation.

Here is the access_iter() function, which requires each file to be represented as an iterator over the lines of the file:

from collections.abc import Iterator 
 
def access_iter(source_iter: Iterator[str]) -> Iterator[Access]: 
    for line in source_iter: 
        if access := Access.create(line): 
            yield access

The output from the local_gzip() function is a sequence of strings. The outer sequence is based on the lines from individual log files. If the line matches the given pattern, it’s a file access of some kind. We can create an Access instance from the dictionary of text parsed by the regular expression. Non-matching lines are quietly discarded.

The essential design pattern here is to build an immutable object from the results of a parsing function. In this case, the parsing function is a regular expression matcher. Other kinds of parsing can also fit this design pattern.

There are some alternative ways to do this. For example, here’s a function that applies map() and filter():

def access_iter_2(source_iter: Iterator[str]) -> Iterator[Access]: 
    return filter( 
        None, 
        map( 
            Access.create, 
            source_iter 
        ) 
    )

This access_iter_2() function transforms the output from the local_gzip() function into a sequence of Access instances. In this case, we apply the Access.create() function to the string iterator that results from reading a collection of files. The filter() function removes any None objects from the result of the map() function.

Our point here is to show that we have a number of functional styles for parsing files. In Chapter 4, Working with Collections, we looked at very simple parsing. Here, we’re performing more complex parsing, using a variety of techniques.

14.3.4 Parsing additional fields of an Access object

The initial Access object created previously doesn’t decompose some inner elements in the nine fields that comprise an access log line. We’ll parse those items separately from the overall decomposition into high-level fields. Doing these parsing operations separately makes each stage of processing simpler. It also allows us to replace one small part of the overall process without breaking the general approach to analyzing logs.

The resulting object from the next stage of parsing will be a NamedTuple subclass, AccessDetails, which wraps the original Access tuple. It will have some additional fields for the details parsed separately:

from typing import NamedTuple, Optional 
import datetime 
import urllib.parse 
 
class AccessDetails(NamedTuple): 
    access: Access 
    time: datetime.datetime 
    method: str 
    url: urllib.parse.ParseResult 
    protocol: str 
    referrer: urllib.parse.ParseResult 
    agent: dict[str, str] 
 
    @classmethod 
    def create(cls: type, access: Access) -> "AccessDetails": 
          meth, url, protocol = parse_request(access.request) 
          return AccessDetails( 
              access=access, 
              time=parse_time(access.time), 
              method=meth, 
              url=urllib.parse.urlparse(url), 
              protocol=protocol, 
              referrer=urllib.parse.urlparse(access.referer), 
              agent=parse_agent(access.user_agent) 
          )

The access attribute is the original Access object, a collection of simple strings. The time attribute is the parsed access.time string. The method, url, and protocol attributes come from decomposing the access.request field. The referrer attribute is a parsed URL.

The agent attribute can also be broken down into fine-grained fields. The rules are quite complex, and we’ve decided that a dictionary mapping names to their associated values will be good enough.

Here are the three detail-level parsers for the fields to be decomposed:

from typing import Optional 
import datetime 
import re 
 
def parse_request(request: str) -> tuple[str, str, str]: 
    words = request.split() 
    return words[0], ’ ’.join(words[1:-1]), words[-1] 
 
def parse_time(ts: str) -> datetime.datetime: 
    return datetime.datetime.strptime( 
        ts, "%d/%b/%Y:%H:%M:%S %z" 
    ) 
 
def parse_agent(user_agent: str) -> dict[str, str]: 
    agent_pat = re.compile( 
        r"(?P<product>S*?)s+" 
        r"((?P<system>.*?))s*" 
        r"(?P<platform_details_extensions>.*)" 
    ) 
 
    if agent_match := agent_pat.match(user_agent): 
        return agent_match.groupdict() 
    return {}

We’ve written three parsers for the HTTP request, the time stamp, and the user agent information. The request value in a log is usually a three-word string such as GET /some /path HTTP/1.1. The parse_request() function extracts these three space-separated values. In the unlikely event that the path has spaces in it, we’ll extract the first word and the last word as the method and protocol; all the remaining words are part of the path.

Time parsing is delegated to the datetime module. We’ve provided the proper format in the parse_time() function.

Parsing the user agent is challenging. There are many variations; we’ve chosen a common one for the parse_agent() function. If the user agent text matches the given regular expression, we’ll use the attributes of the AgentDetails class. If the user agent information doesn’t match the regular expression, we’ll use the None value instead. The original text will be available in the Access object in either case.

We’ll use these three parsers to build AccessDetails instances from the given Access objects. The main body of the access_detail_iter() function looks like this:

from collections.abc import Iterable, Iterator 
 
def access_detail_iter( 
    access_iter: Iterable[Access] 
) -> Iterator[AccessDetails]: 
    for access in access_iter: 
        yield AccessDetails.create(access)

We’ve used a similar design pattern to the previous access_iter() function. A new object is built from the results of parsing some input object. The new AccessDetails object will wrap the previous Access object. This technique allows us to use immutable objects, yet still contains more refined information.

This function is essentially a mapping from an Access object to a sequence of AccessDetails objects. Here’s an alternative design using the map() high-level function:

from collections.abc import Iterable, Iterator 
 
def access_detail_iter_2( 
    access_iter: Iterable[Access] 
) -> Iterator[AccessDetails]: 
    return map(AccessDetails.create, access_iter)

As we move forward, we’ll see that this variation fits in nicely with the way the multiprocessing module works.

In an object-oriented programming environment, these additional parsers might be method functions or properties of a class definition. The advantage of an object-oriented design with lazy parsing methods is that items aren’t parsed unless they’re needed. This particular functional design parses everything, assuming that it’s going to be used.

It’s possible to create a lazy functional design. It can rely on the three parser functions to extract and parse the various elements from a given Access object as needed. Rather than using the details.time attribute, we’d use the parse_time(access.time) function. The syntax is longer, but it ensures that the attribute is only parsed as needed. We could also make it a property that preserves the original syntax. We’ve left this as an exercise for the reader.

14.3.5 Filtering the access details

We’ll look at several filters for the AccessDetails objects. The first is a collection of filters that reject a lot of overhead files that are rarely interesting. The second filter will be part of the analysis functions, which we’ll look at later.

The path_filter() function is a combination of three functions:

  • Exclude empty paths

  • Exclude some specific filenames

  • Exclude files that have a given extension

A flexible design can define each test as a separate first-class, filter-style function. For example, we might have a function such as the following to handle empty paths:

def non_empty_path(detail: AccessDetails) -> bool: 
    path = detail.url.path.split(’/’) 
    return any(path)

This function ensures that the path contains a name. We can write similar tests for the non_excluded_names() and non_excluded_ext() functions. Names like favicon.ico and robots.txt need to be excluded. Similarly, extensions like .js and .css need to be excluded as well. We’ve left these two additional filters as exercises for the reader.

The entire sequence of filter() functions will look like this:

def path_filter( 
    access_details_iter: Iterable[AccessDetails] 
) -> Iterable[AccessDetails]: 
    non_empty = filter(non_empty_path, access_details_iter) 
    nx_name = filter(non_excluded_names, non_empty) 
    nx_ext = filter(non_excluded_ext, nx_name) 
    yield from nx_ext

This style of stacked filters has the advantage of being slightly easier to expand when we add new filter criteria.

The use of generator functions (such as the filter() function) means that we aren’t creating large intermediate objects. Each of the intermediate variables, non_empty, nx_name, and nx_ext, is a proper lazy generator function; no processing is done until the data is consumed by a client process.

While elegant, this suffers from inefficiency because each function will need to parse the path in the AccessDetails object. In order to make this more efficient, we could wrap a path.split(’/’) function with the @cache decorator. An alternative is to split the path on the / characters, and save the list in the AccessDetails object.

14.3.6 Analyzing the access details

We’ll look at two analysis functions that we can use to filter and analyze the individual AccessDetails objects. The first function will filter the data and pass only specific paths. The second function will summarize the occurrences of each distinct path.

We’ll define a small book_in_path() function and combine this with the built-in filter() function to apply the function to the details. Here is the composite book_filter() function:

from collections.abc import Iterable, Iterator 
 
def book_filter( 
    access_details_iter: Iterable[AccessDetails] 
) -> Iterator[AccessDetails]: 
    def book_in_path(detail: AccessDetails) -> bool: 
        path = tuple( 
            item 
            for item in detail.url.path.split(’/’) 
            if item 
        ) 
        return path[0] == ’book’ and len(path) > 1 
    return filter(book_in_path, access_details_iter)

We’ve defined a rule, through the book_in_path() function, which we’ll apply to each AccessDetails object. If the path has at least two components and the first component of the path is ’book’, then we’re interested in these objects. All other AccessDetails objects can be quietly rejected.

The reduce_book_total() function is the final reduction that we’re interested in:

from collections import Counter 
 
def reduce_book_total( 
    access_details_iter: Iterable[AccessDetails] 
) -> dict[str, int]: 
    counts: Counter[str] = Counter( 
        detail.url.path for detail in access_details_iter 
    ) 
    return counts

This function will produce a Counter() object that shows the frequency of each path in an AccessDetails object. In order to focus on a particular set of paths, we’ll use the reduce_total(book_filter(details)) expression. This provides a summary of only items that are passed by the given filter.

Because Counter objects can be applied to a wide variety of types, a type hint is required to provide a narrow specification. In this case, the hint is dict[str, int] to show the mypy tool that string representations of paths will be counted.

14.3.7 The complete analysis process

Here is the composite analysis() function that digests a collection of log files:

def analysis(log_path: Path) -> dict[str, int]: 
    """Count book chapters in a given log""" 
    details = access_detail_iter( 
        access_iter( 
            local_gzip(log_path))) 
    books = book_filter(path_filter(details)) 
    totals = reduce_book_total(books) 
    return totals

The analysis() function uses the local_gzip() function to work with a single path. It applies a stack of parsing functions, access_detail_iter() and access_iter(), to create an iterable sequence of AccessDetails objects. It then applies a stack of filters to exclude paths that aren’t interesting. Finally, it applies a reduction to a sequence of AccessDetails objects. The result is a Counter object that shows the frequency of access for certain paths.

A sample collection of saved .gzip format log files totals about 51 MB. Processing the files serially with this function takes over 140 seconds. Can we do better using concurrent processing?

14.4 Using a multiprocessing pool for concurrent processing

One elegant way to make use of the multiprocessing module is to create a processing Pool object and assign work to the various workers in that pool. We will depend on the OS to interleave execution among the various processes. If each of the processes has a mixture of I/O and computation, we should be able to ensure that our processor (and disk) are kept very busy. When processes are waiting for the I/O to complete, other processes can do their computations. When an I/O operation finishes, the process waiting for this will be ready to run and can compete with others for processing time.

The recipe for mapping work to a separate process looks like this:

def demo_mp(root: Path = SAMPLE_DATA, pool_size: int | None = None) -> None: 
    pool_size = ( 
        multiprocessing.cpu_count() if pool_size is None 
        else pool_size 
    ) 
    combined: Counter[str] = Counter() 
    with multiprocessing.Pool(pool_size) as workers: 
        file_iter = list(root.glob(LOG_PATTERN)) 
        results_iter = workers.imap_unordered(analysis, file_iter) 
        for result in results_iter: 
            combined.update(result) 
    print(combined)

This function creates a Pool object with separate worker processes and assigns this Pool object to the workers variable. We then map the analytical function, analysis, to an iterable queue of work to be done using the pool of processes. Each process in the workers pool gets assigned items from the iterable queue. In this case, the queue is the result of the root.glob(LOG_PATTERN) attribute, which is a sequence of file names.

As each worker completes the analysis() function and returns a result, the parent process that created the Pool object can collect those results. This allows us to create several concurrently built Counter objects and to merge them into a single, composite result.

If we start p processes in the pool, our overall application will include p + 1 processes. There will be one parent process and p children. This often works out well because the parent process will have little to do after the subprocess pools are started. Generally, the workers will be assigned to separate CPUs (or cores) and the parent will share a CPU with one of the children in the Pool object.

The ordinary Linux parent/child process rules apply to the subprocesses created by this module. If the parent crashes without properly collecting the final status from the child processes, then zombie processes can be left running. For this reason, a process Pool object is also a context manager. When we use a pool through the with statement, at the end of the context, the children are properly collected.

By default, a Pool object will have a number of workers based on the value of the multiprocessing.cpu_count() function. This number is often optimal, and simply using the with multiprocessing.Pool() as workers: attribute might be sufficient.

In some cases, it can help to have more workers than CPUs. This might be true when each worker has I/O-intensive processing. Having many worker processes waiting for I/O to complete can improve the overall runtime of an application.

If a given Pool object has p workers, this mapping can cut the processing time to almost 1 p of the time required to process all of the logs serially. Pragmatically, there is some overhead involved with communication between the parent and child processes in the Pool object. These overheads will limit the effectiveness of subdividing the work into very small concurrent pieces.

The multiprocessing Pool object has several map-like methods to allocate work to a pool. We’ll look at map(), imap(), imap_unordered(), and starmap(). Each of these is a variation on the common theme of assigning a function to a pool of processes and mapping data items to that function. Additionally, there are two async variants: map_async() and starmap_async(). These functions differ in the details of allocating work and collecting results:

  • The map(function, iterable) method allocates items from the iterable to each worker in the pool. The finished results are collected in the order they were allocated to the Pool object so that order is preserved.

  • The imap(function, iterable) method is lazier than map(). By default, it sends each individual item from the iterable to the next available worker. This might involve more communication overhead. For this reason, a chunk size larger than 1 is suggested.

  • The imap_unordered(function, iterable) method is similar to the imap() method, but the order of the results is not preserved. Allowing the mapping to be processed out of order means that, as each process finishes, the results are collected.

  • The starmap(function, iterable) method is similar to the itertools.starmap() function. Each item in the iterable must be a tuple; the tuple is passed to the function using the * modifier so that each value of the tuple becomes a positional argument value. In effect, it’s performing function(*iterable[0]), function(*iterable[1]), and so on.

The two _async variants don’t simply return a result; they return an AsyncResult object. This object has some status information. We can, for example, see if the work has been completed in general, or if it has been completed without an exception. The most important method of an AsyncResult object is the .get() method, which interrogates the worker for the result.

This extra complexity works well when the duration of processing is highly variable. We can collect results from workers as the results become available. The behavior for the non-_async variants is to collect results in the order the work was started, preserving the order of the original source data for the map-like operation.

Here is the map_async() variant of the preceding mapping theme:

def demo_mp_async(root: Path = SAMPLE_DATA, pool_size: int | None = None) -> None: 
    pool_size = ( 
        multiprocessing.cpu_count() if pool_size is None 
        else pool_size 
    ) 
    combined: Counter[str] = Counter() 
    with multiprocessing.Pool(pool_size) as workers: 
        file_iter = root.glob(LOG_PATTERN) 
        results = workers.map_async(analysis, file_iter) 
        for result in results.get(): 
            combined.update(result) 
    print(combined)

We’ve created a Counter() function that we’ll use to consolidate the results from each worker in the pool. We created a pool of subprocesses based on the number of available CPUs, and used the Pool object as a context manager. We then mapped our analysis() function to each file in our file-matching pattern. The resulting Counter objects from the analysis() function are combined into a single resulting counter.

This version took about 68 seconds to analyze a batch of log files. The time to analyze the logs was cut dramatically using several concurrent processes. The single-process baseline time was 150 seconds. Other experiments need to be run with larger pool sizes to determine how many workers are required to make the system as busy as possible.

We’ve created a two-tiered map-reduce process with the multiprocessing module’s Pool.map_async() function. The first tier was the analysis() function, which performed a map-reduce on a single log file. We then consolidated these reductions in a higher-level reduce operation.

14.4.1 Using apply() to make a single request

In addition to the map-like variants, a pool also has an apply(function, *args, **kw) method that we can use to pass one value to the worker pool. We can see that the various map() methods are really a for statement wrapped around the apply() method. We can, for example, use the following command to process a number of files:

list( 
    workers.apply(analysis, f) 
    for f in SAMPLE_DATA.glob(LOG_PATTERN) 
)

It’s not clear, for our purposes, that this is a significant improvement. Almost everything we need to do can be expressed as a map() function.

14.4.2 More complex multiprocessing architectures

The multiprocessing package supports a wide variety of architectures. We can create multiprocessing structures that span multiple servers and provide formal authentication techniques to create a necessary level of security. We can pass objects from process to process using queues and pipes. We can share memory between processes. We can also share lower-level locks between processes as a way to synchronize access to shared resources such as files.

Most of these architectures involve explicitly managing states among several working processes. Using locks and shared memory, in particular, is imperative in nature and doesn’t fit in well with a functional programming approach.

We can, with some care, treat queues and pipes in a functional manner. Our objective is to decompose a design into producer and consumer functions. A producer can create objects and insert them into a queue. A consumer will take objects out of a queue and process them, perhaps putting intermediate results into another queue. This creates a network of concurrent processors and the workload is distributed among these various processes.

This design technique has some advantages when designing a complex application server. The various subprocesses can exist for the entire life of the server, handling individual requests concurrently.

14.4.3 Using the concurrent.futures module

In addition to the multiprocessing package, we can also make use of the concurrent.futures module. This also provides a way to map data to a concurrent pool of threads or processes. The module API is relatively simple and similar in many ways to the multiprocessing.Pool() function’s interface.

Here is an example to show how similar they are:

def demo_cf_threads(root: Path = SAMPLE_DATA, pool_size: int = 4) -> None: 
    pattern = "*itmaybeahack.com*.gz" 
    combined: Counter[str] = Counter() 
    with futures.ProcessPoolExecutor(max_workers=pool_size) 
            as workers: 
        file_iter = root.glob(LOG_PATTERN) 
        for result in workers.map(analysis, file_iter): 
            combined.update(result) 
    print(combined)

The most significant change between the preceding example and the previous examples is that we’re using an instance of the concurrent.futures.ProcessPoolExecutor object instead of a multiprocessing.Pool object. The essential design pattern is to map the analysis() function to the list of filenames using the pool of available workers. The resulting Counter objects are consolidated to create a final result.

The performance of the concurrent.futures module is nearly identical to the multiprocessing module.

14.4.4 Using concurrent.futures thread pools

The concurrent.futures module offers a second kind of executor that we can use in our applications. Instead of creating a concurrent.futures.ProcessPoolExecutor object, we can use the ThreadPoolExecutor object. This will create a pool of threads within a single process.

The syntax for thread pools is almost identical to using a ProcessPoolExecutor object. The performance, however, can be remarkably different. CPU-intensive processing doesn’t often show improvement in a multi-threaded environment because there’s no computation while waiting for I/O to complete. Processing that is I/O-intensive can benefit from multi-threading.

Using sample log files and a small four-core laptop running macOS X, these are the kinds of results that indicate the difference between threads that share I/O resources and processes:

  • Using the concurrent.futures thread pool, the elapsed time was 168 seconds

  • Using a process pool, the elapsed time was 68 seconds

In both cases, the Pool object’s size was 4. The single-process and single-thread baseline time was 150 seconds; adding threads made processing run more slowly. This result is typical of programs doing a great deal of computation with relatively little waiting for input and output. The multithreading module is often more appropriate for the following kinds of applications:

  • User interfaces where threads are idle for long periods of time, while waiting for the person to move the mouse or touch the screen

  • Web servers where threads are idle while waiting for data to transfer from a large, fast server through a network to a (relatively) slow client

  • Web clients that extract data from multiple web servers, especially where these clients must wait for data to percolate through a network

It’s important to benchmark and measure performance.

14.4.5 Using the threading and queue modules

The Python threading package involves a number of constructs helpful for building imperative applications. This module is not focused on writing functional applications. We can make use of thread-safe queues in the queue module to pass objects from thread to thread.

A queue permits safe data sharing. Since the queue processing involves using OS services, it can also mean applications using queues may observe less interference from the GIL.

The threading module doesn’t have a simple way of distributing work to various threads. The API isn’t ideally suited to functional programming.

As with the more primitive features of the multiprocessing module, we can try to conceal the stateful and imperative nature of locks and queues. It seems easier, however, to make use of the ThreadPoolExecutor method in the concurrent.futures module. The ThreadPoolExecutor.map() method provides us with a very pleasant interface to concurrently process the elements of a collection.

The use of the map() function primitive to allocate work seems to fit nicely with our functional programming expectations. For this reason, it’s best to focus on the concurrent.futures module as the most accessible way to write concurrent functional programs.

14.4.6 Using async functions

The asyncio module helps us work with async functions to—perhaps—better interleave processing and computation. It’s important to understand that async processing leverages the threading model. This means that it can effectively interleave waiting for I/O with computation. It does not effectively interleave pure computation.

In order to make use of the asyncio module, we need to do the following four things:

  1. Add the async keyword to our various parsing and filtering functions to make them coroutines.

  2. Add await keywords to collect results from one coroutine before passing them to another coroutine.

  3. Create an overall event loop to coordinate the async/await processing among the coroutines.

  4. Create a thread pool to handle file reading.

The first three steps listed above don’t involve deep complexity. The asyncio module helps us create tasks to parse each file, and then run the collection of tasks. The event loop ensures that coroutines will pause at the await statements to collect results. It also ensures coroutines with available data are eligible to process. The interleaving of the coroutines happens in a single thread. As noted previously, the number of bytecode operations is not magically made smaller by changing the order of execution.

The tricky part of this is dealing with input and output operations that are not part of the asyncio module. Specifically, reading and writing local files is not part of asyncio. Any time we attempt to read (or write) a file, the operating system request could block waiting for the operation to complete. Unless this blocking request is in a separate thread, it stops the event loop, and stops all of Python’s cleverly interleaved coroutine processing. See https://docs.python.org/3/library/asyncio-eventloop.html#id14 for more information on using a thread pool.

To work with local files, we would need to use a concurrent.futures.ThreadPoolExecutor object to manage the file input and output operations. This will allocate the work to threads outside the main event loop. Consequently, a design for local file processing based on async/await will not be dramatically better than one using concurrent.futures directly.

For network servers and complex clients, the asyncio module can make the application very responsive to a user’s inputs. The fine-grained switching among the coroutines within a thread works best when most of the coroutines are waiting for data.

14.4.7 Designing concurrent processing

From a functional programming perspective, we’ve seen three ways to use the map() function concept applied to data items concurrently. We can use any one of the following:

  • multiprocessing.Pool

  • concurrent.futures.ProcessPoolExecutor

  • concurrent.futures.ThreadPoolExecutor

These are almost identical in the way we interact with them; all three of these process pools support variations of a map() method that applies a function to items of an iterable collection. This fits in elegantly with other functional programming techniques. The performance of each pool may be different because of the nature of concurrent threads versus concurrent processes.

As we stepped through the design, our log analysis application decomposed into two overall areas:

  • The lower-level parsing: This is generic parsing that will be used by almost any log analysis application

  • The higher-level analysis application: This is more specific filtering and reduction focused on our application’s needs

The lower-level parsing can be decomposed into four stages:

  1. Reading all the lines from multiple source log files. This was the local_gzip() mapping from file name to a sequence of lines.

  2. Creating named tuples from the lines of log entries in a collection of files. This was the access_iter() mapping from text lines to Access objects.

  3. Parsing the details of more complex fields such as dates and URLs. This was the access_detail_iter() mapping from Access objects to AccessDetails objects.

  4. Rejecting uninteresting paths from the logs. We can also think of this as passing only the interesting paths. This was more of a filter than a map operation. This was a collection of filters bundled into the path_filter() function.

We defined an overall analysis() function that parsed and analyzed a given log file. It applied the higher-level filter and reduction to the results of the lower-level parsing. It can also work with a wildcard collection of files.

Given the number of mappings involved, we can see several ways to decompose this problem into work designed to use a pool of threads or processes. Each mapping is an opportunity for concurrent processing. Here are some of the mappings we can consider as design alternatives:

  • Map the analysis() function to individual files. We used this as a consistent example throughout this chapter.

  • Refactor the local_gzip() function out of the overall analysis() function. This refactoring permits mapping a revised analysis() function to the results of the local_gzip() function.

  • Refactor the access_iter(local_gzip(pattern)) function out of the overall analysis() function. This revised analysis() function can be applied via map() to the iterable sequence of the Access objects.

  • Refactor the access_detail_iter(access_iter(local_gzip(pattern))) function into two separate iterables. This permits using map() to apply one function to create AccessDetail objects. A separate, higher-level filter and reduction against the iterable sequence of the AccessDetail objects can be a separate process.

  • We can also refactor the lower-level parsing into a function to keep it separate from the higher-level analysis. We can map the analysis filter and reduction against the output from the lower-level parsing.

All of these are relatively simple methods to restructure the example application. The benefit of using functional programming techniques is that each part of the overall process can be defined as a mapping, a filter, or a reduction. This makes it practical to consider different architectures to locate an optimal design.

In this case, however, we need to distribute the I/O processing to as many CPUs or cores as we have available. Most of these potential refactorings will perform all of the I/O in the parent process; these will only distribute the computation portions of the work to multiple concurrent processes with little resulting benefit. Because of these, we want to focus on the mappings, as these distribute the I/O to as many cores as possible.

It’s often important to minimize the amount of data being passed from process to process. In this example, we provided just short filename strings to each worker process. The resulting Counter object was considerably smaller than the 10 MB of compressed detail data in each log file.

It’s also essential to run benchmarking experiments to confirm the actual timing between computation, input, and output. This information is essential to uncover optimal allocation of resources, and a design that better balances computation against waiting for I/O to complete.

The following table contains some preliminary results:



Approach Duration
concurrent.futures/threadpool106.58s
concurrent.futures/processpool40.81s
multiprocessing/imap_unordered27.26s
multiprocessing/map_async27.45s


We can see that a thread pool doesn’t permit any useful serialization of the work. This is not unexpected, and provides a kind of worst-case benchmark.

The concurrent.futures/processpool row shows the time with 4 workers. This variant used the map() to parcel requests to the workers. The need to process the work and collect the results in a specific order may have caused relatively slow processing.

The multiprocessing modules used the default number of cores, which is 8 for the computer being used. The time was cut almost to 1 4 the baseline time. In order to make better use of the available processors, it might make sense to further decompose the processing to create batches of lines for analysis, and have separate worker pools for analysis and file parsing. Because the workloads are very difficult to predict, a flexible, functional design allows the restructuring of the work, searching for a way to maximize CPU use.

14.5 Summary

In this chapter, we’ve looked at two ways to support the concurrent processing of multiple pieces of data:

  • The multiprocessing module: Specifically, the Pool class and the various kinds of mappings available to a pool of workers.

  • The concurrent.futures module: Specifically, the ProcessPoolExecutor and ThreadPoolExecutor classes. These classes also support a mapping that will distribute work among workers that are threads or processes.

We’ve also noted some alternatives that don’t seem to fit in well with functional programming. There are numerous other features of the multiprocessing module, but they’re not a good fit with functional design. Similarly, the threading and queue modules can be used to build multithreaded applications, but the features aren’t a good fit with functional programs.

In the next chapter, we’ll look at how we can apply functional programming techniques to build web service applications. The idea of HTTP can be summarized as response = httpd(request). When the HTTP processing is stateless, this seems to be a perfect match for functional design.

Adding stateful cookies to this is analogous to providing a response value which is expected as an argument to a later request. We can think of it as response, cookie = httpd(request, cookie), where the cookie object is opaque to the client.

14.6 Exercises

This chapter’s exercises are based on code available from Packt Publishing on GitHub. See https://github.com/PacktPublishing/Functional-Python-Programming-3rd-Edition.

In some cases, the reader will notice that the code provided on GitHub includes partial solutions to some of the exercises. These serve as hints, allowing the reader to explore alternative solutions.

In many cases, exercises will need unit test cases to confirm they actually solve the problem. These are often identical to the unit test cases already provided in the GitHub repository. The reader should replace the book’s example function name with their own solution to confirm that it works.

14.6.1 Lazy parsing

In the Parsing additional fields of an Access object section, we looked at a function that did the initial decomposition of a Common Log File (CLF) line into an initial set of easy-to-separate fields.

We then applied three separate functions to parse the details of the timestamp, request, the time, and the user agent information. These three functions were applied eagerly, decomposing these three fields, even if they were never used for further analysis.

There are two commonly-used ways to implement lazy parsing of these fields:

  • Rather than parse the text to create a details.time attribute, we can define a parse_time() method to parse the access.time value. The syntax is longer, but it ensures that the attribute is only parsed as needed.

  • Once we have this function, we can make it into a property.

First, redefine a new Access_Details class to use three separate methods to parse the complex fields.

Once this works, make these methods into properties to provide values as if they had been parsed eagerly. Make sure the new property method names match the original attribute names in the class shown earlier.

To compare the performance, we need to know how often these additional property parsing methods are used. Two simple assumptions are 100% of the time and 0% of the time. To compare the two designs, we’ll need some statistical summary functions that work with the Access_Details objects.

Create a function that fetches the values of all attributes, to compute a number of histograms, for example. Create another that uses only the status value to compute a histogram of status only. Compare the performance of the two Access_Details class variants and the two analytic approaches to see which is faster. The expectation is that lazy parsing will be faster. The question is ”how much faster?”

14.6.2 Filter access path details

In the Filtering the access details section of this chapter, we showed a function to exclude empty paths from further analysis.

We can write similar test functions for the non_excluded_names() and non_excluded_ext() functions. Names like ’favicon.ico’ and ’robots.txt’ need to be excluded. Similarly, extensions like ’.js’ and ’.css’ need to be excluded, also.

Write these two functions to complete the implementation of the path_filter() function. These require some unit test cases, as does the overall path_filter() function that exploits three separate path function filters.

All of these functions work with a decomposed path name. Is it sensible to try to write a single, complex function for all three operations? Does it make more sense to decompose the three separate rules and combine them through an overall path filtering function?

14.6.3 Add @cache decorators

The implementation of the path_filter() function applies three separate filters. Each filter function will parse the path in the AccessDetails object. In order to make this more efficient, it can help to wrap lower-level parsing, like a path.split(’/’) function, with the @cache decorator.

Write (or rewrite) these three filter functions to make use of the @cache decorator.

Be sure to compare performance of the filter functions with caching and without caching. This can be challenging because when we use a simple @cache decorator, the original, uncached function is no longer available.

If, on the other hand, we use something like func_c = cache(func), we can preserve both the original (uncached) function and the counterpart with caching. See Chapter 12, Decorator Design Techniques, for more on how this works. Doing this lets us gather timing data for cached and uncached implementations.

14.6.4 Create sample data

The design shown uses a mapping from filenames to summary counts. Each file is processed concurrently by a pool of workers. In order to determine if this is optimal, it’s essential to have a high volume of data to measure performance.

For a lightly-used website, the log files can average about 10 Mb per month. Write a Python script to generate synthetic log rows in batches averaging about 10 Mb per file. Using simplistic random strings isn’t the best approach because the application design expects that the request path will have a recognizable pattern. This requires some care to generate synthetic data that fits the expected pattern.

The application to create synthetic data needs some unit test cases. The overall analysis application is the final acceptance test case: does the analysis application identify the data patterns built into the synthetic rows of log entries?

14.6.5 Change the pipeline structure

For a lightly-used website, the log files can average about 10 Mb per month. Using Python 3.10 on a MacBook Pro, each file takes about 16 seconds to process. A collection of six 10 Mb files has a worst-case performance of 96 seconds. On a computer with over six cores, the best case would be 16 seconds.

The design shown in this chapter allocates each file to a separate worker.

Is this the right level of granularity? It’s impossible to know without exploring alternatives. This requires sample data files created by the previous exercise. Consider implementing alternative designs and comparing throughput. Here are some suggested alternatives:

  • Create two pools of workers: one pool reads files and returns lines in blocks of 1,024. The second pool of workers comprises the bulk of the analysis() function. This second pool has workers to parse each line in a block to create an Access object, create an AccessDetails object, apply the filters, and summarize the results. This leads to two tiers of mapping to pass work from the parsing workers to the analysis workers.

  • Decompose the 10 Mb log files into smaller sizes. Write an application to read a log file and write new files, each of which is limited to 4,096 individual log entries. Apply the analysis application to this larger collection of small files instead of the smaller collection of the original large log files.

  • Decompose the analysis() function to use three separate pools of workers. One pool parses files and returns blocks of Access objects. Another pool transforms Access objects into AccessDetails objects. The third pool of workers applies filters and summarizes the AccessDetails objects.

Summarize the results of using distinct processing pipelines to analyze large volumes of data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset