Skip to content

Streams

ry edited this page Aug 6, 2010 · 10 revisions

Node is working towards a unified data stream interface.

Readable Stream

  • event: 'data'. (One argument, a Buffer object.)
  • event: 'end'
  • method: pause()
  • method: resume()

Examples of readable streams (some of these don’t exist yet): server-side HTTP request, client-side HTTP response, process.stdin, childProcess.stdout, childProcess.stderr

Writable Stream

  • method: write(data) (Returns true if it could be flushed to kernel, false if not. Data is always queued for writing, even if flushing is not possible. If this method returns false then the event 'drain' will be emitted later, once all data that was buffered has been written to the kernel.)
  • method: close()
  • event: 'drain'

Examples of writable streams (some of these don’t exist yet): server-side HTTP response, client-side HTTP request, process.stdout, process.stderr, childProcess.stdin

Utility Functions

Once there is a unified interface to streams some easy utility functions are possible which route data smartly. For example, writing an HTTP request to a file, with all the proper throttling and buffering, is easy as

http.createServer(function (req, res) {
  if (req.method == 'POST') {
    // Open writable file system
    var temp = fs.openTemporaryFile(); 
    // Pump the request into the temp file.
    stream.pump(req, temp, function (err) {
      if (err) {
        res.writeHead(500, {'Content-Type' : 'text/plain'});
        res.write('Error uploading.\n');
      } else {
        res.writeHead(200, {'Content-Type' : 'text/plain'});
        res.write('Successful upload\n');
      }
      res.close();
    });
  } else {
    res.writeHead(200, {'Content-Type' : 'text/plain'});
    res.write('Hello\n');
    res.close();
  }
});

The pump function might look something like this

function pump (readable, writable, cb) {
  readable.addListener('data', function (data) {
    if (!writable.write(data)) readable.pause();
  });

  readable.addListener('end', function () {
    writable.close();
    if (cb) cb();
  });

  writable.addListener('drain', function () {
    readable.resume();
  });
}
Clone this wiki locally