NodeJS Streams, Pump, and Pipe

Home / NodeJS Streams, Pump, and Pipe

NodeJS Streams, Pump, and Pipe

December 9, 2015 | Article | 1 Comment

Stream is an object that represents a generic sequence of bytes. Any type of data can be stored as a sequence of bytes, so the details of writing and reading data can be abstracted.

Node has a useful abstraction for stream. More specifically, two very abstractions: Read Streams and Write Streams. They are implemented throughout several Node objects, and they represent inbound (ReadStream) or outbound (WriteStream) flow of data.

Though read and write operation on stream is not special in any programming language, Node has unique way to do it.

ReadStream

A ReadStream is like a faucet of data. The method of creating streams are depends on the type of stream itself. After you have created a one, you can: wait for data, know when it ends, pause it, and resume it.

Wait for data

By binding to the “data” event we can be notified every time there is a chunk being delivered by that stream. It can be delivered as a buffer or as a string.
If we use stream.setEncoding(encoding), the “data” events pass in strings. If we don’t set an encoding, the “data” events pass in buffers.

var readStream = ...
readStream.on('data', function(data) {
    // data is a buffer
});

var readStream = ...
readStream.setEncoding('utf8');
readStream.on('data', function(data) {
    // data is utf8-encoded string
});

So here data passed in on the first example is a buffer, and the second one is a string because we specify it as utf8 string.

The size of each chunk may vary, it may depend on buffer size or on the amount of available data so it might unpredictable.

Know when it ends

A stream can end, and we can know when that happens. By binding to the “end” event, we can see it.

var reasStream = ...
readStream.on('end', function() {
    console.log('the stream has ended');
});

Pause

A read stream is like a faucet, and we can keep the data from coming in by pausing it.

readStream.pause();

Resume

If stream is paused, we can reopen it and the stream can start flowing again.

readStream.resume();

WriteStream

A WriteStream is an abstraction on somewhere you can send data to. It can be a file or network connection or even an object that outputs data that was transformed.

When we have WriteStream object, we can do two operations: write and wait for it to drain.

Write

We can write data to stream. The data can be in string format or a buffer format.

By default, write operation will treat a stirng as utf8 string unless it is told otherwise.

var writeStream = ...;

writeStream.write('this is an utf-8 string');
writeStream.write('7e3e4acde5ad240a8ef5e731e644fbd1', 'base64');

For writing a buffer, we can slightly modify it to

var writeStream = ...;
var buffer = new Buffer('this is a buffer with some string');
writeStream.write(buffer);

Wait for it to drain

Node does not block on I/O operation, so it does not block on read or write commands. On write commands, if Node is not able to flush the data into the kernel buffers, it will buffer that data, storing it in our process memory. Because of this, writeStream.write() returns a boolean. If write() manages to flush all data to the kernel buffer, it returns true. If not, it returns false.

When a writeStream manages to do flush the data into the kernel buffers, it emits a “drain” event so we can listen it like this:

var writeStream = ...;
writeStream.on('drain', function() { console.log('drain emitted'); });

Stream by Example

FileSystem stream

We can create a read stream for a file path.

var fs = require('fs');

var rs = fs.createReadStream('/path/to/file');

We can also pass some options for .createReadStream(), for example: start and end position of file, the encoding, the flags, and the buffer size. Below is the default value of option:

{
    flags: 'r',
    encoding: null,
    fd: null,
    mode: 0666,
    bufferSize: 64*1024
}

We can also create a write stream

var fs = require('fs');
var rs = fs.createWriteStream('/path/to/file', options);

Which also accepts a second argument with an option object. Below is the default value of option:

{
    flags: 'w',
    encoding: null,
    mode: 0666
}

We can also give a single specification if it is necessary.

var fs = require('fs');
var rs = fs.createWriteStream('/path/to/file', {encoding: 'utf8'});

Case Study: Slow Client Problem

As said before, Node does not block on writes, and it buffers the data if the write cannot be flushed into the kernel buffers. Now if we are pumping data into a write stream (like a TCP connection to a browser) and our source of data is a read stream (like a file ReadStream):

var fs = require('fs');

require('http').createServer(function(req, res) {
   var rs = fs.createReadStream('/path/to/big/file');
   rs.on('data', function(data) {
      res.write(data);
   });
   rs.on('end', function() {
      res.end();
   });
});

If the file is local, the read stream should be fast. Now if the connection to the client is slow, the writeStream will be slow. So readStream “data” events will happen quickly, the data will be sent to the writeStream, but eventually Node will have to start buffering the data because the kernel buffers will be full.

What will happen then is that the /path/to/big/file file will be buffered in memory for each request, and if we have many concurrent requests, Node memory consumption will inevitably increase, which may lead to other problems, like swapping, thrashing and memory exhaustion.

To address this problem we have to make use of the pause and resume of the read stream, and pace it alongside your write stream so your memory does not fill up:

var fs = require('fs');

require('http').createServer(function(req, res) {
   var rs = fs.createReadStream('/path/to/big/file');
   rs.on('data', function(data) {
      if(!res.write(data)) {
         rs.pause();
      }
   });
   res.on('drain', function() {
      rs.resume();
   });
   rs.on('end', function() {
      res.end();
   });
});

We are pausing the readStream if the write cannot flush it to the kernel, and we are resuming it when the writeSTream is drained.

Pump

What was described here is a recurring pattern, and instead of this complicated chain of events we can simply use util.pump() which does exactly what we described:

var util = require('util');
var fs = require('fs');

require('http').createServer(function(req, res) {
    var rs = fs.createReadStream('/path/to/big/file');
    util.pump(rs, res, function() {
        res.end();
    });
});

util.pump() accept 3 argumens: the readable stream, the writable stream, and a callback when the read stream ends.

Pipe

There is another approach we can use, pipe. A ReadStream can be piped into a WriteStream on the same fashion, simply by calling pipe(destination).

var fs = require('fs');

require('http').createServer(function(req, res) {
    var rs = fs.createReadStream('/path/to/big/file');
    rs.pipe(res);
});

By default, end() is called on the destination when the read stream ends. We can prevent that behavior by passing in end: false on the second argument options object like this:

var fs = require('fs');

require('http').createServer(function(req, res) {
    var rs = fs.createReadStream('/path/to/big/file');
    rs.pipe(res, {end: false});
    rs.end(function() {
        res.end("And that's all folks!");
    });
});

Creating Own Read and Write Streams

We can implement our own read and write streams.

ReadStream

When creating a Readable stream, we have to implement following methods:

  • setEncoding(encoding)
  • pause()
  • resume()
  • destroy()

and emit the following events:

  • “data”
  • “end”
  • “error”
  • “close”
  • “fd” (not mandatory)

We should also implement the pipe() method, but we can lend some help from Node by inheriting from Stream.

var MyClass = ...
var util = require('util'),
    Stream = require('stream').Stream;
util.inherits(MyClass, Stream);

This will make the pipe method available at no extra cost.

WriteStream

To implement our own WriteStream-ready pseudo-class we should provide the following methods:

  • write(string, encoding=’utf8′, [fd])
  • write(buffer)
  • end()
  • end(string, encoding)
  • end(buffer)
  • destroy()

and emit the following events:

  • “drain”
  • “error”
  • “close”

, ,

About Author

about author

xathrya

A man who is obsessed to low level technology.

1 Comment
  1. NodeJS TLS / SSL - Xathrya.ID

    […] connecting to the server. tls.connect returns a CryptoStream object, which we can use normally as ReadStream and WriteStream. We then wait for data from server as we would on a ReadStream, and then we send it […]

Leave a Reply

Your email address will not be published. Required fields are marked *

Social media & sharing icons powered by UltimatelySocial