Introducing the concurrent.futures module

In this section, we are going to explain the concurrent.futures module, whose objective is to introduce a layer of simplification on the modules that are threading and multiprocessing.

concurrent.futures is a module that is part of the standard Python library and provides a high-level abstraction layer where the threads are modeled as asynchronous tasks.

The term futures is synonymous with promises, delay, or deferred when working with asynchronous tasks. In general, regardless of what you call it, you can see it as a pending result. Futures are a replacement for a result that is not yet available, usually because their computation has not yet ended, or their transfer over the network has not been completed.

The module has an abstract base class called an executor, which is used for the ThreadPoolExecutor (used for multithreading) and ProcessPoolExecutor (used for multiprocessing) subclasses. The max_workers parameter identifies the max number of workers that execute the call asynchronously, and are as follows:

  • concurrent.futures.ThreadPoolExecutor (max_workers)
  • concurrent.futures.ProcessPoolExecutor (max_workers)

The approach we are adopting here involves using a ThreadPoolExecutor. We will deliver the tasks that have been assigned to the pool and return them later, which are results that we will return to when they are available in the future. Of course, we can wait for the future to become real results. Let's look at an example of the first subclass, ThreadPoolExecutor, with a practical case that allows you to download files asynchronously from https://docs.python.org/3/download.html.

You can find the following code in the download_async_files.py file:

#!/usr/bin/python3

from concurrent.futures import ThreadPoolExecutor

import requests
import itertools
import time

docs = ['https://docs.python.org/3/archives/python-3.7.2-docs-pdf-letter.zip',
'https://docs.python.org/3/archives/python-3.7.2-docs-pdf-a4.zip',
'https://docs.python.org/3/archives/python-3.7.2-docs-html.zip',
'https://docs.python.org/3/archives/python-3.7.2-docs-text.zip',
'https://docs.python.org/3/archives/python-3.7.2-docs.epub'
]

def download_documents(documents, workers=4):
def get_document(url):
response = requests.get(url)
filename = url.split("/")[5]
print('Downloading '+ filename)
open(filename, 'wb').write(response.content)
return url

In the previous code block, we define the document list we are downloading and the download_documents() method, which accepts the document list and worker's number that's used by ThreadPoolExecutor as parameters. In the following code block, we are defining our executor, which we will use for downloading documents in a concurrent way:

 message = 'Downloading docs from https://docs.python.org/3/archives'
symbol = itertools.cycle('|/-')
executor = ThreadPoolExecutor(max_workers=workers)
mydocs = [executor.submit(get_document, url) for url in documents]
while not all([doc.done() for doc in mydocs]):
print(message + next(symbol), end=' ')
time.sleep(0.1)
return mydocs

if __name__ == '__main__':
t1 = time.time()
print(download_documents(docs, workers=4))
print(time.time() - t1, 'seconds passed')

This is the output of the previous script. We can see information about futures that are complete when we download certain files:

Downloading python-3.7.2-docs-text.ziphon.org/3/archives-
Downloading python-3.7.2-docs-pdf-a4.zipn.org/3/archives/
Downloading python-3.7.2-docs-pdf-letter.zipg/3/archives
Downloading python-3.7.2-docs.epub.python.org/3/archives
Downloading python-3.7.2-docs-html.ziphon.org/3/archives/

[<Future at 0x3cc8970 state=finished returned str>, <Future at 0x3ce0430 state=finished returned str>, <Future at 0x3ce07f0 state=finished returned str>, <Future at 0x3ce0bb0 state=finished returned str>, <Future at 0x3ce0f90 state=finished returned str>]

We will study each of the actions that are carried out in detail, as follows:

  • Inside thedownload_documents function, another call has been defined—get_document (URL). This function  makes the requests to the file and downloads it to the local filesystem.
  • Later, we instantiated ThreadPoolExecutor and created a list, mydocs, which is where we will save the futures. Instanced objects of the Future class (each of the elements in the mydocs list) encapsulate the asynchronous execution of the callable. Each of these objects come from Executor.submit().
  • Within the whole block, we ask each of the downloads whether they have finished by using the Future.done() method. If it has finished, it will return True, otherwise it will return False.
  • Finally, we return the mydocs list with the calculated futures.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset