Piping patterns

As in real-life plumbing, Node.js streams also can be piped together following different patterns; we can, in fact, merge the flow of two different streams into one, split the flow of one stream into two or more pipes, or redirect the flow based on a condition. In this section, we are going to explore the most important plumbing techniques that can be applied to Node.js streams.

Combining streams

In this chapter, we have stressed a lot on the fact that streams provide a simple infrastructure to modularize and reuse our code, but there is one last piece missing in this puzzle: what if we want to modularize and reuse an entire pipeline? What if we want to combine multiple streams so that they look like one from the outside? The following figure shows what this means:

Combining streams

From the preceding diagram, we should already get a hint of how this works:

  • When we write into the combined stream, we are actually writing into the first stream of the pipeline
  • When we read from the combined stream, we are actually reading from the last stream of the pipeline

A combined stream is usually a Duplex stream, which is built by connecting the first stream to its Writable side and the last stream to its Readable side.

Tip

To create a Duplex stream out of two different streams, one Writable and one Readable, we can use an npm module such as duplexer2 (https://npmjs.org/package/duplexer2).

But that's not enough; in fact, another important characteristic of a combined stream is that it has to capture all the errors that are emitted from any stream inside the pipeline. As we already mentioned, any error event is not automatically propagated down the pipeline; so, if we want to have proper error management (and we should), we will have to explicitly attach an error listener to each stream. However, if the combined stream is really a black box, which means that we don't have access to any of the streams in the middle of the pipeline, so it's crucial for the combined stream to also act as an aggregator for all the errors coming from any stream in the pipeline.

To recap, a combined stream has two major advantages:

  • We can redistribute it as a black box by hiding its internal pipeline
  • We have simplified error management, as we don't have to attach an error listener to each stream in the pipeline, but just to the combined stream itself

Combining streams is a pretty generic and common practice, so if we don't have any particular need we might just want to reuse an existing solution such as multipipe (https://www.npmjs.org/package/multipipe) or combine-stream (https://www.npmjs.org/package/combine-stream), just to name two.

Implementing a combined stream

To illustrate a simple example, let's consider the case of the following two transform streams:

  • One that both compresses and encrypts the data
  • One that both decrypts and decompresses the data

Using a library such as multipipe, we can easily build these streams by combining some of the streams that we already have available from the core libraries (file combinedStreams.js):

const zlib = require('zlib'); 
const crypto = require('crypto'); 
const combine = require('multipipe'); 
 
module.exports.compressAndEncrypt = password => { 
  return combine( 
    zlib.createGzip(), 
    crypto.createCipher('aes192', password) 
  ); 
}; 
 
module.exports.decryptAndDecompress = password => { 
  return combine( 
    crypto.createDecipher('aes192', password), 
    zlib.createGunzip() 
  ); 
}; 

We can now use these combined streams as if they were black boxes, for example, to create a small application that archives a file by compressing and encrypting it. Let's do that in a new module named archive.js:

const fs = require('fs'); 
const compressAndEncryptStream = 
  require('./combinedStreams').compressAndEncrypt; 
 
fs.createReadStream(process.argv[3]) 
  .pipe(compressAndEncryptStream(process.argv[2])) 
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")); 

We can further improve the preceding code by building a combined stream out of the pipeline that we created, this time not to obtain a reusable black box but only to take advantage of its aggregated error management. In fact, as we have already mentioned many times, writing something such as the following will only catch the errors that are emitted by the last stream:

fs.createReadStream(process.argv[3]) 
  .pipe(compressAndEncryptStream(process.argv[2])) 
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")) 
  .on('error', err => { 
    //Only errors from the last stream 
    console.log(err); 
  }); 

However, by combining all the streams together, we can fix the problem elegantly. Let's then rewrite the archive.js file as follows:

const combine = require('multipipe'); 
const fs = require('fs'); 
const compressAndEncryptStream = 
  require('./combinedStreams').compressAndEncrypt; 
 
combine( 
  fs.createReadStream(process.argv[3]) 
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")) 
).on('error', err => { 
  //this error may come from any stream in the pipeline 
  console.log(err); 
}); 

As we can see, we can now attach an error listener directly to the combined stream and it will receive any error event that is emitted by any of its internal streams.

Now, to run the archive module, simply specify a password and a file in the command-line argument:

node archive mypassword /path/to/a/file.txt

With this example, we have clearly demonstrated how important it is to combine streams; from one aspect, it allows us to create reusable compositions of streams, and from another, it simplifies the error management of a pipeline.

Forking streams

We can perform a fork of a stream by piping a single Readable stream into multiple Writable streams. This is useful when we want to send the same data to different destinations, for example, two different sockets or two different files. It can also be used when we want to perform different transformations on the same data, or when we want to split the data based on some criteria. The following figure gives us a graphical representation of this pattern:

Forking streams

Forking a stream in Node.js is a trivial matter; let's see why by working on an example.

Implementing a multiple checksum generator

Let's create a small utility that outputs both the sha1 and md5 hashes of a given file. Let's call this new module generateHashes.js and let's start by initializing our checksum streams:

const fs = require('fs'); 
const crypto = require('crypto'); 
 
const sha1Stream = crypto.createHash('sha1'); 
sha1Stream.setEncoding('base64'); 
 
const md5Stream = crypto.createHash('md5'); 
md5Stream.setEncoding('base64'); 

Nothing special so far; the next part of the module is actually where we will create a Readable stream from a file and fork it to two different streams in order to obtain two other files, one containing the sha1 hash and the other containing the md5 checksum:

const inputFile = process.argv[2]; 
const inputStream = fs.createReadStream(inputFile); 
inputStream 
  .pipe(sha1Stream) 
  .pipe(fs.createWriteStream(inputFile + '.sha1')); 
 
inputStream 
  .pipe(md5Stream) 
  .pipe(fs.createWriteStream(inputFile + '.md5')); 

Very simple, right? The inputStream variable is piped into sha1Stream on one side and md5Stream on the other. There are a couple of things to note, though, that happen behind the scenes:

  • Both md5Stream and sha1Stream will be ended automatically when inputStream ends, unless we specify {end: false} as an option when invoking pipe()
  • The two forks of the stream will receive the same data chunks, so we must be very careful when performing side-effect operations on the data, as that would affect every stream that we are forking to
  • Back-pressure will work out-of-the-box; the flow coming from inputStream will go as fast as the slowest branch of the fork!

Merging streams

Merging is the opposite operation to forking and consists of piping a set of Readable streams into a single Writable stream, as shown in the following figure:

Merging streams

Merging multiple streams into one is in general a simple operation; however, we have to pay attention to the way we handle the end event, as piping using the auto end option will cause the destination stream to be ended as soon as one of the sources ends. This can often lead to an error situation, as the other active sources will still continue to write to an already terminated stream. The solution to this problem is to use the option {end: false} when piping multiple sources to a single destination and then invoke end() on the destination only when all the sources have completed reading.

Creating a tarball from multiple directories

To make a simple example, let's implement a small program that creates a tarball from the contents of two different directories. For this purpose, we are going to introduce two new NPM packages:

Our new module is going to be called mergeTar.js; let's define its contents starting from some initialization steps:

const tar = require('tar'); 
const fstream = require('fstream'); 
const path = require('path'); 
 
const destination = path.resolve(process.argv[2]); 
const sourceA = path.resolve(process.argv[3]); 
const sourceB = path.resolve(process.argv[4]); 

In the preceding code, we are just loading all the dependencies and initializing the variables that contain the name of the destination file and the two source directories (sourceA and sourceB).

Next, we will create the tar stream and pipe it into its destination:

const pack = tar.Pack(); 
pack.pipe(fstream.Writer(destination)); 

Now it's time to initialize the source streams:

let endCount = 0; 
function onEnd() { 
  if(++endCount === 2) { 
    pack.end(); 
  } 
} 
 
const sourceStreamA = fstream.Reader({type: "Directory", path: sourceA}) 
  .on('end', onEnd); 
 
const sourceStreamB = fstream.Reader({type: "Directory", path: sourceB}) 
  .on('end', onEnd); 

In the preceding code, we created the streams that read from both the two source directories (sourceStreamA and sourceStreamB); then for each source stream we attach an end listener, which will terminate the pack stream only when both the directories are read completely.

Finally, it is time to perform the real merge:

sourceStreamA.pipe(pack, {end: false}); 
sourceStreamB.pipe(pack, {end: false}); 

We pipe both the sources into the pack stream and take care to disable the auto ending of the destination stream by providing the option {end: false} to the two pipe() invocations.

With this, we have completed our simple TAR utility. We can try this utility by providing the destination file as the first command-line argument, followed by the two source directories:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

To conclude this section, it's worth mentioning that, on npm, we can find a few modules that can simplify the merging of streams, for example:

As for the last comment on the stream merge pattern, it's worth reminding that the data piped into the destination stream is randomly intermingled; this is a property that can be acceptable in some types of object streams (as we saw in the last example) but it is often an undesired effect when dealing with binary streams.

However, there is one variation of the pattern that allows us to merge streams in order; it consists of consuming the source streams one after the other, when the previous one ends, the next one starts emitting chunks (it is like concatenating the output of all the sources). As always, on NPM we can find some packages that also deal with this situation. One of them is multistream (https://npmjs.org/package/multistream).

Multiplexing and demultiplexing

There is a particular variation of the merge stream pattern in which we don't really want to just join multiple streams together but, instead, use a shared channel to deliver the data of a set of streams. This is a conceptually different operation because the source streams remain logically separated inside the shared channel, which allows us to split the stream again once the data reaches the other end of the shared channel. The following figure clarifies the situation:

Multiplexing and demultiplexing

The operation of combining multiple streams together (in this case, also known as channels) to allow transmission over a single stream is called multiplexing, while the opposite operation, namely reconstructing the original streams from the data received from a shared stream, is called demultiplexing. The devices that perform these operations are called multiplexer (or mux) and demultiplexer (or demux) respectively. This is a widely studied area in computer science and telecommunications in general, as it is one of the foundations of almost any type of communication media such as telephony, radio, TV, and of course the Internet itself. For the scope of this book, we will not go too far with the explanations, as this is a vast topic.

What we want to demonstrate in this section, instead, is how it's possible to use a shared Node.js stream in order to convey multiple logically separated streams that are then split again at the other end of the shared stream.

Building a remote logger

Let's use an example to drive our discussion. We want to have a small program that starts a child process and redirects both its standard output and standard error to a remote server, which in turn saves the two streams into two separate files. So, in this case, the shared medium is a TCP connection, while the two channels to be multiplexed are the stdout and stderr of a child process. We will leverage a technique called packet switching, the same technique that is used by protocols such as IP, TCP, or UDP and that consists of wrapping the data into packets allowing us to specify various meta information, useful for multiplexing, routing, controlling the flow, checking for corrupted data, and so on. The protocol that we are going to implement for our example is very minimalist; in fact, we will simply wrap our data into packets with the following structure:

Building a remote logger

As shown in the preceding figure, the packet contains the actual data, but also a header (Channel ID + Data length), which will make it possible to differentiate the data of each stream and enable the demultiplexer to route the packet to the right channel.

Client side – multiplexing

Let's start to build our application from the client side. With a lot of creativity, we will call the module client.js; this represents the part of the application that is responsible for starting a child process and multiplexing its streams.

So, let's start by defining the module. First, we need some dependencies:

const child_process = require('child_process'); 
const net = require('net'); 

Then, let's implement a function that performs the multiplexing of a list of sources:

function multiplexChannels(sources, destination) { 
  let totalChannels = sources.length; 
  for(let i = 0; i <sources.length; i++) { 
    sources[i] 
      .on('readable', function() {                           //[1] 
        let chunk; 
        while((chunk = this.read()) !== null) { 
          const outBuff = new Buffer(1 + 4 + chunk.length);  //[2] 
          outBuff.writeUInt8(i, 0);        
          outBuff.writeUInt32BE(chunk.length, 1); 
          chunk.copy(outBuff, 5); 
          console.log('Sending packet to channel: ' + i); 
          destination.write(outBuff);                        //[3] 
        } 
      } 
      .on('end', () => {                                     //[4] 
        if(--totalChannels === 0) { 
           destination.end(); 
        } 
      }); 
  } 
} 

The multiplexChannels() function takes in as input the source streams to be multiplexed and the destination channel, and then it performs the following steps:

  1. For each source stream, it registers a listener for the readable event where we read the data from the stream using the non-flowing mode.
  2. When a chunk is read, we wrap it into a packet that contains in order: 1 byte (UInt8) for the channel ID, 4 bytes (UInt32BE) for the packet size, and then the actual data.
  3. When the packet is ready, we write it into the destination stream.
  4. Finally, we register a listener for the end event so that we can terminate the destination stream when all the source streams are ended.

Tip

Our protocol is to be able to multiplex up to 256 different source streams because we only have 1 byte to identify the channel.

Now the last part of our client becomes very easy:

const socket = net.connect(3000, () => {                    //[1] 
  const child = child_process.fork(                         //[2] 
  process.argv[2], 
  process.argv.slice(3), 
    {silent: true} 
  ); 
  multiplexChannels([child.stdout, child.stderr], socket);  //[3] 
}); 

In this last code fragment, we perform the following operations:

  1. We create a new TCP client connection to the address localhost:3000.
  2. We start the child process by using the first command-line argument as the path, while we provide the rest of the process.argv array as arguments for the child process. We specify the option {silent: true}, so that the child process does not inherit stdout and stderr of the parent.
  3. Finally, we take stdout and stderr of the child process and we multiplex them into socket using the mutiplexChannels() function.

Server side – demultiplexing

Now we can take care of creating the server side of the application (server.js), where we demultiplex the streams from the remote connection and pipe them into two different files. Let's start by creating a function called demultiplexChannel():

const net = require('net'); 
const fs = require('fs'); 
 
function demultiplexChannel(source, destinations) { 
  let currentChannel = null; 
  let currentLength = null; 
 
  source 
    .on('readable', () => {                                      //[1] 
      let chunk; 
      if(currentChannel === null) {                              //[2] 
        chunk = source.read(1); 
        currentChannel = chunk && chunk.readUInt8(0); 
      } 
 
      if(currentLength === null) {                               //[3] 
        chunk = source.read(4); 
        currentLength = chunk && chunk.readUInt32BE(0); 
        if(currentLength === null) { 
          return; 
        } 
      } 
 
      chunk = source.read(currentLength);                         //[4] 
      if(chunk === null) { 
        return; 
      } 
      console.log('Received packet from: ' + currentChannel); 
      destinations[currentChannel].write(chunk);                  //[5] 
      currentChannel = null; 
      currentLength = null; 
    }) 
    .on('end', ()=> {                                             //[6] 
      destinations.forEach(destination => destination.end()); 
      console.log('Source channel closed'); 
    }); 
} 

The preceding code might look complicated but it is not; thanks to the pull nature of Node.js Readable streams, we can easily implement the demultiplexing of our little protocol as follows:

  1. We start reading from the stream using the non-flowing mode.
  2. First, if we have not read the channel ID yet, we try to read 1 byte from the stream and then transform it into a number.
  3. The next step is to read the length of the data. We need 4 bytes for that, so it's possible (even if unlikely) that we don't have enough data in the internal buffer, which will cause the this.read() invocation to return null. In such a case, we simply interrupt the parsing and retry at the next readable event.
  4. When we finally can also read the data size, we know how much data to pull from the internal buffer, so we try to read it all.
  5. When we read all the data, we can write it to the right destination channel, making sure that we reset the currentChannel and currentLength variables (these will be used to parse the next packet).
  6. Lastly, we make sure to end all the destination channels when the source channel ends.

Now that we can demultiplex the source stream, let's put our new function to work:

net.createServer(socket => { 
  const stdoutStream = fs.createWriteStream('stdout.log'); 
  const stderrStream = fs.createWriteStream('stderr.log');  
  demultiplexChannel(socket, [stdoutStream, stderrStream]); 
}).listen(3000, () => console.log('Server started')); 

In the preceding code, we first start a TCP server on the port 3000, then for each connection that we receive, we will create two Writable streams pointing to two different files, one for the standard output and another for the standard error; these are our destination channels. Finally, we use demultiplexChannel() to demultiplex the socket stream into stdoutStream and stderrStream.

Running the mux/demux application

Now, we are ready to try our new mux/demux application, but first let's create a small Node.js program to produce some sample output; let's call it generateData.js:

console.log("out1"); 
console.log("out2"); 
console.error("err1"); 
console.log("out3"); 
console.error("err2"); 

Okay, now we are ready to try our remote logging application. First, let's start the server:

node server

Then the client, by providing the file that we want to start as child process:

node client generateData.js

The client will run almost immediately, but at the end of the process the standard input and standard output of the generateData application have traveled through one single TCP connection and then, on the server, have been demultiplexed into two separate files.

Note

Please make a note that, as we are using child_process.fork() (http://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options), our client will be able to launch only other Node.js modules.

Multiplexing and demultiplexing object streams

The example that we have just shown demonstrated how to multiplex and demultiplex a binary/text stream, but it's worth mentioning that the same rules apply also to object streams. The greatest difference is that, using objects, we already have a way to transmit the data using atomic messages (the objects), so multiplexing would be as easy as setting a property channelID into each object, while demultiplexing would simply involve reading the channelID property and routing each object towards the right destination stream.

Another pattern involving only demultiplexing consists of routing the data coming from a source depending on some condition. With this pattern, we can implement complex flows, such as the one shown in the following diagram:

Multiplexing and demultiplexing object streams

The demultiplexer used in the system described by the preceding diagram, takes a stream of objects representing animals and distributes each of them to the right destination stream based on the class of the animal: reptiles, amphibians, and mammals.

Using the same principle, we can also implement an if...else statement for streams; for some inspiration, take a look at the ternary-stream package (https://npmjs.org/package/ternary-stream) that allows us to do exactly that.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset