Collection of Node.js streams.
$ npm install flow.io
To use flow,
var flow = require( 'flow.io' );
The flow library is comprised of several smaller modules. If you want to roll your own flow, follow the links and import the individual modules.
The flow library includes the following stream factories...
File readStream factory.
var stream = flow.read()
.path( 'path/to/file' )
.stream( clbk );
stream.pipe( process.stdout );
File writeStream factory.
var stream = flow.write()
.path( 'path/to/destination' )
.stream( clbk );
readStream.pipe( stream );
Transform stream factory to parse JSON. Wraps JSONStream's parse stream.
var rStream = flow.read()
.path( 'path/to/file.json' )
.stream();
var pStream = flow.parse().stream();
rStream
.pipe( pStream )
.pipe( /* writable stream*/ );
Transform stream factory to stringify JSON. Wraps JSONStream's stringify stream.
var rStream = flow.read()
.path( 'path/to/file.json' )
.stream();
var pStream = flow.parse().stream();
var sStream = flow.stringify().stream();
var wStream = flow.write()
.path( 'path/to/destination.json' )
.stream();
rStream
.pipe( pStream )
.pipe( sStream )
.pipe( wStream );
Transform stream factory to convert an array into an element-by-element readable stream. Similar to event-stream readArray, except a transform stream. Useful when using a sink stream which generates an array and where downstream streams in a stream pipeline require individual data elements.
Transform stream factory to chunk streamed data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1, 1, 1, 2, 2, 2, 3, 3, 3 ] );
var stream = flow.chunkify()
.numValues( 3 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to sink a specified number of streamed data values and then stream new data values as they arrive.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1, 1, 1, 2, 3, 4, 5 ] );
var stream = flow.sinkandstream()
.numValues( 3 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to map a data value to another data value via a transformation function. Wraps event-stream's map stream.
Transform stream factory to perform a data reduction.
Transform stream factory to map numeric data stream values to their absolute values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ -1, -1, 0, 1, 1 ] );
var stream = flow.abs().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to round numeric data stream values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 3.245, -0.9845, -20.446, 0.5, 1.0, 10 ] );
var stream = flow.round().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to floor numeric data stream values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 3.245, -0.9845, -20.446, 0.5, 1.0, 10, 5.9999 ] );
var stream = flow.floor().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to round numeric data stream values toward positive infinity.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 3.245, -0.9845, -20.446, 0.5, 1.0, 10, 5.9999 ] );
var stream = flow.ceil().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to increment streamed numeric data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1, 0, 1, 1, 0 ] );
var stream = flow.add()
.add( 100 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to decrement streamed numeric data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1, 0, 1, 1, 0 ] );
var stream = flow.subtract()
.subtract( 1 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to perform scalar multiplication on streamed numeric data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 12, 8, 34, 512, 72 ] );
var stream = flow.multiply()
.scalar( 10 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to perform scalar division on streamed numeric data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 12, 8, 34, 512, 72 ] );
var stream = flow.divide()
.divisor( 10 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to exponentiate numeric data stream values according to a specified power.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 2, 4, 3, 5, 7 ] );
var stream = flow.pow()
.exponent( 3 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to evaluate an exponential function for each numeric data value.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1.453, 0, 9.504, 102.2, 1 ] );
var stream = flow.exp().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Transform stream factory to calculate the difference between successive streamed data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 10, 8, 22, 100, 1 ] );
var stream = flow.diff().stream();
readStream
.pipe( stream )
.pipe( /* writable stream */ );
Reduce stream factory to count the number of streamed data elements and stream the result.
Reduce stream factory to find a numeric data stream's minimum value.
Transform stream factory to find sliding-window minimum values (moving min) over a numeric data stream.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [1,0,5,2,3,6,8,1,0] );
var stream = flow.mmin()
.window( 3 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream*/ );
Reduce stream factory to find a numeric data stream's maximum value.
Transform stream factory to find sliding-window maximum values (moving max) over a numeric data stream.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [1,0,5,2,3,6,8,1,0] );
var stream = flow.mmax()
.window( 3 )
.stream();
readStream
.pipe( stream )
.pipe( /* writable stream*/ );
Reduce stream factory to calculate a numeric data stream's sum.
Transform stream factory to compute a sliding-window sum over a numeric data stream.
Transform stream factory to compute the cumulative sum over a numeric data stream.
Reduce stream factory to find a numeric data stream's median value.
Reduce stream factory to compute quantiles from a numeric data stream.
Reduce stream factory to compute the inter-quartile range from a numeric data.
Reduce stream factory to compute a histogram over a numeric data stream.
Reduce stream factory to calculate a numeric data stream's mean.
Transform stream factory to compute a sliding-window average over a numeric data stream.
Transform stream factory to calculate arithmetic means for streamed data arrays (chunks).
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 1, 2, 1, 2, 2, 1, 4, 3, 5.5 ] );
var cStream = flow.chunkify()
.numValues( 3 )
.stream();
var mStream = flow.cmean()
.stream();
readStream
.pipe( cStream )
.pipe( mStream )
.pipe( /* writable stream */ );
Reduce stream factory to calculate a numeric data stream's variance.
Reduce stream factory to calculate the covariance between data elements in a numeric data stream.
Reduce stream factory to compute the Pearson product-moment correlation coefficient between data elements in a numeric data stream.
Reduce stream factory to calculate the sample skewness of streamed data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 82, 34, 45, 56, 56, 71 ] );
var sStream = flow.skewness().stream();
readStream
.pipe( sStream )
.pipe( /* writable stream */ );
Reduce stream factory to calculate the sample excess kurtosis of streamed data values.
var readArray = require( 'event-stream' ).readArray;
var readStream = readArray( [ 82, 34, 45, 56, 56, 71 ] );
var kStream = flow.kurtosis().stream();
readStream
.pipe( kStream )
.pipe( /* writable stream */ );
Filter stream factory which finds stream values matching user-defined criteria.
Filter stream factory which removes any stream values which are not numeric.
Provides a mock source for writable streams.
Note: implemented as a [classic stream](classic stream).
var eventStream = require( 'event-stream' );
// Simulate some data:
var data = new Array( 100 );
for ( var i = 0; i < data.length; i++ ) {
data[ i ] = Math.random()*100;
}
// Create a writable stream:
var writable = eventStream.map( function( d, clbk ){
clbk( null, d.toString()+'\n' );
});
// Pipe to standard out:
writable.pipe( process.stdout );
// Start streaming...
mock( data, writable );
Provides a mock stream sink for readable streams.
Note: implemented as a [classic stream](classic stream).
var eventStream = require( 'event-stream' );
// Simulate some data:
var data = new Array( 100 );
for ( var i = 0; i < data.length; i++ ) {
data[ i ] = Math.random()*100;
}
// Create a readable stream:
var readStream = eventStream.readArray( data );
// Start streaming...
mock( readStream, onEnd );
function onEnd( error, data ) {
console.log( JSON.stringify( data ) );
}
Unit tests use the Mocha test framework with Chai assertions. To run the tests, execute the following command in the top-level application directory:
$ make test
All new feature development should have corresponding unit tests to validate correct functionality.
This repository uses Istanbul as its code coverage tool. To generate a test coverage report, execute the following command in the top-level application directory:
$ make test-cov
Istanbul creates a ./reports/coverage
directory. To access an HTML version of the report,
$ open reports/coverage/lcov-report/index.html
Copyright © 2014. Athan Reines.