The async library

If we take a look for a moment at every control flow pattern we have analyzed so far, we can see that they could be used as a base to build reusable and more generic solutions. For example, we could wrap the unlimited parallel execution algorithm into a function which accepts a list of tasks, runs them in parallel, and invokes the given callback when all of them are complete. This way of wrapping control flow algorithms into reusable functions can lead to a more declarative and expressive way to define asynchronous control flows, and that's exactly what async (https://npmjs.org/package/async) does. The async library is a very popular solution, in Node.js and JavaScript in general, to deal with asynchronous code. It offers a set of functions that greatly simplify the execution of a set of tasks in different configurations and it also provides useful helpers for dealing with collections asynchronously. Even though there are several other libraries with a similar goal, async is a de facto standard in Node.js due to its popularity.

Let's try it straightaway to demonstrate its capabilities.

Sequential execution

The async library can help us immensely when implementing complex asynchronous control flows, but one difficulty with it is choosing the right helper for the problem at hand. For example, for the case of the sequential execution flow, there are around 20 different functions to choose from, including eachSeries(), mapSeries(), filterSeries(), rejectSeries(), reduce(), reduceRight(), detectSeries(), concatSeries(), series(), whilst(), doWhilst(), until(), doUntil(), forever(), waterfall(), compose(), seq(), applyEachSeries(), iterator(), and timesSeries().

Choosing the right function is an important step in writing more compact and readable code, but this also requires some experience and practice. In our examples, we are going to cover just a few of these situations, but they will still provide a solid base to understand and efficiently use the rest of the library.

Now, to show in practice how async works, we are going to adapt our web spider application. Let's start directly with version 2, the one that downloads all the links recursively in sequence.

However, first let's make sure we install the async library into our current project:

npm install async

Then we need to load the new dependency from the spider.js module:

const async = require('async');

Sequential execution of a known set of tasks

Let's modify the download() function first. As we have already seen, it executes the following three tasks in sequence:

  1. Download the contents of a URL.
  2. Create a new directory if it doesn't exist yet.
  3. Save the contents of the URL into a file.

The ideal function to use with this flow is definitely async.series(), which has the following signature:

async.series(tasks, [callback])

It takes a list of tasks and a callback function that is invoked when all the tasks have been completed. Each task is just a function that accepts a callback function, which must be invoked when the task completes its execution:

function task(callback) {} 

The nice thing about async is that it uses the same callback conventions of Node.js, and it automatically handles error propagation. So, if any of the tasks invoke its callback with an error, async will skip the remaining tasks in the list and jump directly to the final callback.

With this in mind, let's see how the download() function would change by using async:

function download(url, filename, callback) { 
  console.log(`Downloading ${url}`); 
  let body; 
 
  async.series([ 
    callback => {                                    //[1] 
      request(url, (err, response, resBody) => { 
        if(err) { 
          return callback(err); 
        } 
        body = resBody; 
        callback(); 
      }); 
    }, 
 
    mkdirp.bind(null, path.dirname(filename)),       //[2] 
 
    callback => {                                    //[3] 
      fs.writeFile(filename, body, callback); 
    } 
  ], err => {                                        //[4] 
    if(err) { 
      return callback(err); 
    } 
    console.log(`Downloaded and saved: ${url}`); 
    callback(null, body); 
  }); 
} 

If we remember the callback hell version of this code, we will surely appreciate the way async allows us to organize our tasks. There is no need to nest callbacks anymore, as we just have to provide a flat list of tasks, usually one for each asynchronous operation, which async will then execute in sequence. This is how we define each task:

  1. The first task involves the download of the URL. Also, we save the response body into a closure variable (body) so that it can be shared with the other tasks.
  2. In the second task, we want to create the directory that will hold the downloaded page. We do this by performing a partial application of the mkdirp() function, binding the path of the directory to be created. This way, we can save a few lines of code and increase its readability.
  3. At last, we write the contents of the downloaded URL to a file. In this case, we could not perform a partial application (as we did for the second task), because the variable, body, is only available after the first task in the series completes. However, we can still save some lines of code by exploiting the automatic error management of async by simply passing the callback of the task directly to the fs.writeFile() function.
  4. After all the tasks are complete, the final callback of async.series() is invoked. In our case, we are simply doing some error management and then returning the body variable to callback of the download() function.

For this specific situation, a possible alternative to async.series() would be async.waterfall(), which still executes the tasks in sequence but in addition, it also provides the output of each task as input to the next. In our situation, we could use this feature to propagate the body variable until the end of our sequence. As an exercise, you can try to implement the same function using the waterfall flow and then take a look at the differences.

Sequential iteration

We saw in the previous paragraph how we can execute a set of known tasks in sequence; we used async.series() to do that. We could use the same functionality to implement the spiderLinks() function of our web spider version 2; however, async offers a more appropriate helper for the specific situation in which we have to iterate over a collection; this helper is async.eachSeries(). Let's use it then to reimplement our spiderLinks() function (version 2, download in series) as follows:

function spiderLinks(currentUrl, body, nesting, callback) { 
  if(nesting === 0) { 
    return process.nextTick(callback); 
  } 
 
  const links = utilities.getPageLinks(currentUrl, body); 
  if(links.length === 0) { 
    return process.nextTick(callback); 
  } 
 
  async.eachSeries(links, (link, callback) => { 
    spider(link, nesting - 1, callback); 
  }, callback); 
} 

If we compare the preceding code, which uses async, with the code of the same function implemented with plain JavaScript patterns, we will notice the big advantage that async gives us in terms of code organization and readability.

Parallel execution

The async library doesn't lack functions to handle parallel flows; among them we can find each(), map(), filter(), reject(), detect(), some(), every(), concat(), parallel(), applyEach(), and times(). They follow the same logic as the functions we have already seen for sequential execution, with the difference being that the tasks provided are executed in parallel.

To demonstrate that, we can try to apply one of these functions to implement version 3 of our web spider application, the one performing the downloads using an unlimited parallel flow.

If we remember the code we used earlier to implement the sequential version of the spiderLinks() function, adapting it to make it work in parallel is a trivial task:

function spiderLinks(currentUrl, body, nesting, callback) { 
  // ... 
  async.each(links, (link, callback) => { 
    spider(link, nesting - 1, callback); 
  }, callback); 
} 

The function is exactly the same one that we used for the sequential download, but this time we used async.each() instead of async.eachSeries(). This clearly demonstrates the power of abstracting the asynchronous flow with a library such as async. The code is not bound to a particular execution flow anymore; there is no code specifically written for that. Most of it is just application logic.

Limited parallel execution

If you are wondering if async can also be used to limit the concurrency of parallel tasks, the answer is yes, it can! We have a few functions we can use for that, namely, eachLimit(), mapLimit(), parallelLimit(), queue(), and cargo().

Let's try to exploit one of them to implement version 4 of the web spider application, the one executing the download of the links in parallel with limited concurrency. Fortunately, async has async.queue(), which works in a similar way to the TaskQueue class we created earlier in the chapter. The async.queue() function creates a new queue, which uses a worker() function to execute a set of tasks with a specified concurrency limit:

const q = async.queue(worker, concurrency); 

The worker() function receives, as input, task to run and a callback function to invoke, when the task completes:

function worker(task, callback) 

We should notice that task in this case can be anything, not just a function. In fact, it's the responsibility of the worker to handle a task in the most appropriate way. New tasks can be added to the queue by using q.push(task, callback). The callback associated to a task has to be invoked by the worker after the task has been processed.

Now, let's modify our code again to implement a parallel globally limited execution flow, using async.queue(). First of all, we need to create a new queue:

const downloadQueue = async.queue((taskData, callback) => { 
  spider(taskData.link, taskData.nesting - 1, callback); 
}, 2); 

The code is really straightforward. We are just creating a new queue with a concurrency limit of 2, having a worker that simply invokes our spider() function with the data associated with a task. Next, we implement the spiderLinks() function:

function spiderLinks(currentUrl, body, nesting, callback) { 
  if(nesting === 0) { 
    return process.nextTick(callback); 
  } 
  const links = utilities.getPageLinks(currentUrl, body); 
  if(links.length === 0) { 
    return process.nextTick(callback); 
  } 
  const completed = 0, hasErrors = false; 
  links.forEach(function(link) { 
    const taskData = {link: link, nesting: nesting}; 
    downloadQueue.push(taskData, err => { 
      if(err) { 
        hasErrors = true; 
        return callback(err); 
      } 
      if(++completed === links.length&& !hasErrors) { 
        callback(); 
      } 
    }); 
  }); 
} 

The preceding code should look very familiar, as it's almost the same as the one we used to implement the same flow using the TaskQueue object. Also, in this case, the important part to analyze is where we push a new task into the queue. At that point, we ensure that we pass a callback that enables us to check if all the download tasks for the current page are completed, and eventually invoke the final callback.

Thanks to async.queue(), we could easily replicate the functionality of our TaskQueue object, again demonstrating that with async, we can really avoid writing asynchronous control flow patterns from scratch, reducing our efforts and saving precious lines of code.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset