Streams: My Favorite Tool
Before we talk about why Streams is my favorite tool in the world, let's look at a couple of scenarios.
Scenario
Problem: Help! I need to load a 300 GB file process it then insert each item into SQL!
Solution: Streams
Scenario 2
Problem: I mapped over an array about 300 times and need to return the final result
Solution: Streams
Scenario 3
Problem: I'm looking to iterate over a buffer for which I don't know when I'll get the next item. And I don't particularly want to create some crazy logic tree to wait for it
Solution: Streams
Scenario 4
Problem: I have a series of mapping and write functions that I need to run on an endless generator function, array and an eventemitter
Solution: Streams
I think you get my point. In all the scenarios I've described, Streams can be the solution.
Streams is one of the things that I can't live without. The pipe method is a perfect example of how Stream does things right.
//streams at their prime
var rawData = createRawStream();
var digestRaw = createRawToJSValues();
var handleData = createJSONMapper();
var prepareForOutput = createOutputPreparation();
var output = createWriteStream();
rawData.pipe(digestRaw).pipe(handleData).pipe(prepareForOutput).pipe(output);
A little History
Here's the Wikipedia page on Stream Processing Paradigm. In the Wiki page, the authors mentioned GPU and other high intensity processing. While that is an understandable reason for why people might use Streams, it's hard to track down why node.js chose to implement their FS implementations as Streams. I suspect the reason is heavily influenced by unix pipes, since it uses the pipe syntax.
Pipe syntax can be fun to play with sometimes, but is mostly quite useless unless you're just experimenting with it. For browsers, one would assume that the closest implementation has been mediastreams. However, they have not been standardized. Web Audio API, while not using node.js Stream API, is certainly implementing a similar one. Their web API uses AudioSourceNode
and AudioDestinationNode
instead of ReadStreams
and WriteStreams
. Hopefully we will have a standardized Streams available in the browser.
Subtle Aspects
Push and Pull Streams Sources
There are two main forms of Stream sources. The Pull Source allows Stream consumers (write or transform) to retrieve items as they need them. The push Stream sends data down the pipe without care. Even though in most cases, this wouldn't be important to you, if you start getting into situations where backpressure is a problem, or when being "live" is more important than ensuring "success," it will soon become apparent what these two are for.
Processing as you need it
Fs.createReadStream
is a great example for processing items as you need. Here, we can directly specify how many bytes should be skipped and how to use the documented read method to take chunks at a time. Its important to note that the read is not recomended. However, this sort of processing can be done in alternative ways. If you have a a source that allows you to read chunks at a time, through a method, here is a way to implement it.
const Readable = require('stream').Readable;
class OnDemandMinimalChunks extends Readable {
constructor(chunkSize, readFn, initialOffset, limit) {
super({ allowHalfOpen: true });
this.chunkSize = chunkSize;
this.offset = initialOffset;
this.limit = limit;
this.readFn = readDn;
this.isReading = false;
}
_read(size) {
if(this.isReading){
return false;
}
this.isReading = true;
readFn(
this.offset,
Math.min(this.offset + this.chunkSize, this.limit)
).then((value) =>{
this.push(value);
this.offset += this.chunkSize;
if(this.offset >= this.limit){
this.push(null);
}
this.isReading = false;
});
}
}
Merging multiple sources
This may sound like a cute gimmick, but it has very real possibilities. A Merged Stream can act as a collective logger, as game event emitter, or event join diffs of a buffered string from multiple sources. It's tempting to write off a merged Stream as ludicrous, but I'm confident you'll understand when you need it. Though merging everything at once may be fine, you may also be interested in ordered merges.
Ensuring no data is lost
Since we don't know when the readstream will begin, if you're not using it syncronously, generally it'd be a good idea to pipe it to a passthrough, especially if you're creating a consumer asyncronously. While the readstream does buffer, you may need to pause it in order to prevent any loss. I have personally run into issues where initial data was lost.
Heres an example of what I have implemented in the past
const Duplex = require('stream').Duplex;
class StreamBuffer extends Duplex {
constructor() {
super({ allowHalfOpen: true });
this.bufferedData = Buffer.alloc(0);
this.waiting = false;
}
_write(chunk, encoding, callback) {
var leftover = false;
if(this.waiting !== false){
if(this.waiting > chunk.size()){
this.push(Buffer.from(chunk, encoding));
} else {
var temp = Buffer.from(chunk, encoding);
this.push(temp.slice(0, this.waiting))
leftover = temp.slice(this.waiting);
}
} else {
leftover = Buffer.from(chunk, encoding)
}
if(leftover !== false) {
this.bufferedData = Buffer.concat(
[this.buffer, leftover],
[this.buffer.length + leftover.length]
);
}
callback();
}
_flush(cb){
this.hasEnded = true;
if(this.waiting !== false){
return this.push(null);
}
cb();
}
_read(size) {
if(this.bufferedData.length === 0){
if(this.hasEnded){
return this.push(null);
}
this.waiting += size
return false;
}
if(size >= this.bufferedData.length){
this.waiting = size - this.bufferedData.length;
this.push(this.bufferedData);
this.bufferedData = Buffer.alloc(0);
return false;
}
this.push(this.bufferedData.slice(0, size));
this.bufferedData = this.bufferedData.slice(size);
}
}
Versitility of the duplex
A duplex is both a readstream and a write stream. Sure, that alone sounds awesome and dandy but the possibilities aren't limited to "transform streams".
Only processing the "latest"
First off, you will need to "itemize" your Streams so you can know how late is too late. But transforming data doesn't just mean mapping it or filtering it. It can also mean flow control. This includes LIFO as well as only processing the latest found. You can adapt it to become multiple
const Duplex = require('stream').Duplex;
class OnlyLatestBuffer extends Duplex {
constructor(limit, fastForward) {
super({ allowHalfOpen: true });
limit = limit || 1;
this.limit = limit;
this.bufferedItems = [];
this.waiting = false;
}
_write(chunk, encoding, callback) {
if(this.waiting !== false){
this.waiting = false;
this.push(chunk);
}else{
if(this.bufferedItems.length === this.limit){
this.bufferedItems.shift();
}
this.bufferedItems.push(chunk);
}
callback();
}
_flush(cb){
this.hasEnded = true;
if(this.waiting !== false){
this.push(null);
}
cb();
}
_read(size) {
if(this.bufferedItems.length === 0){
if(this.hasEnded){
return this.push(null);
}
this.waiting = true;
return false;
}
this.push(this.bufferedItems.shift());
}
}
IO
Often times, a duplex will act as an IO to an external source — a TCP connection is a great example. My favorite websocket library is web-driver-node simply because it isn't a blackbox server and you don't have to just wrap it around the body and socket in a constructor. Instead, you pipe all the TCP data to the websocket driver, which transforms and emits data as message events. It also transforms and sends all the data down the socket you direct it to send to. I especially appreciate the purity in the implementation, even if it's not the fastest.
var http = require('http'),
websocket = require('websocket-driver');
var server = http.createServer();
server.on('upgrade', function(request, socket, body) {
if (!websocket.isWebSocket(request)) return;
var driver = websocket.http(request);
driver.io.write(body);
socket.pipe(driver.io).pipe(socket);
driver.messages.on('data', function(message) {
console.log('Got a message', message);
});
driver.start();
});
Transform Handle
While most transforms are implemented with the expectation that they will happen in the context they were created and run in, it's possible to actually let transforms handle other APIs or even configurations. A simple example is the transform api. Here, you can call this.push(item)
and/or callback any time. You can also run the function offsite through an AJAX call.
new Transform({
transform(chunk, encoding, callback){
fetch("domain.com/some/api", { method: post, body: chunk })
.then(function(resp){
if(resp.status !== 200){
return Promise.resolve(resp.string()).then(function(str){
throw str;
})
}
return Promise.resolve(resp.string())
}).then(callback.bind(void 0, void 0), callback);
}
})
This can also be reapplied to target a series of workers or even piping transform on demand, i.e. with node red.
Alias Repiping
Another neat thing you can do with transforms is that they can pipe results back to itself until there are results that can't be repiped. An implementation for this would be an alias system. In an alias system, "some name"
might be an alias for "closer to real"
, which may be an alias for "/some/rediculous/path"
. However, if you are simply piping it twice or three times, you would be missing out on all the possible functionalities.
var alias_map = {};
new Transform({
readableObjectMode: true, writableObjectMode: true,
transform(chunk, encoding, callback){
if(chunk in alias_map){
this.write(chunk);
} else{
this.push(chunk)
}
callback();
}
})
Writestreams
Promising A Finish Line
While stream-to-promise handles both write streams and read streams, you might only be interested in the way write stream handles it (unless you don't like Streams at all...but I hope you'll fall in love with it! ). This makes it possible for promises and streams to coexist.
getReadableStream().then(function(readable){
readable.pipe(transform).pipe(transform).pipe(writable)
return streamToPromise(writable);
}).then(function(){
console.log("done");
})
Streamed UI
React is certainly a wonderful system. However, in order to use Streams with React, your best bet is to use Redux. Otherwise, you can also turn your written stream into an updatable list, which emits an event each write and unpipes and ends on component did unmount.
// How it should be
class ComponentWritable extends Writable {
constructor(component, key){
super({ objectMode: true });
this.component = component;
this.key = key;
this.list = [];
component.on("willUnmount", ()=>{
this.end();
});
component.on("willMount", ()=>{
this.component.setState(this.createState())
});
}
_write(chunk, encoding, callback){
this.list = this.list.concat([ chunk ]);
this.component.on("didUpdate", function(){
setTimeout(callback, 0);
});
this.component.setState(this.createState());
}
createState(){
return {
[this.key]: this.list
}
}
}
// how it be
function mountWritable(component, stateKey){
if(!component.writableStreams){
component.writableStreams = {};
}
component.writableStreams[stateKey] = new Writable({
objectMode: true,
write: (chunk, encoding, callback)=>{
this.setState({
[stateKey]: component.writableStreams[stateKey].val.concat([ chunk ])
});
component.writableStreams[stateKey].cb = callback;
}
);
component.writableStreams[stateKey].val = [];
component.writableStreams[stateKey].cb = false;
}
function handleUnmount(component){
component.writableStreams &&
Object.keys(component.writableStreams).forEach(function(key){
var stream = component.writableStreams[key]
var cb = stream.cb;
stream.cb = false;
cb && cb();
});
}
function handleUpdate(component){
component.writableStreams &&
Object.keys(component.writableStreams).forEach(function(key){
var stream = component.writableStreams[key]
stream.end();
delete component.writableStreams[key];
});
}
class WritableComponent extends Component {
componentWillMount(){
var stateKey = "some-key";
mountWritable(this, stateKey);
}
componentWillUnmount(){
handleUnmount(this);
}
componentDidUpdate(){
handleUpdate(this)
}
}
Keeping it simple
RxJS
This seems pretty standard for Angular. While I haven't looked into it too deeply, it is on my list of "standards-to-learn."
Highland Js
Highland has been in development for a long time; however, I haven't decided to bite the bullet and use it to replace Streams.
Through2
While creating Streams is fun and all, keeping things clean without creating classes is often difficult. Through2 allows you to make things fast and easy.
Events Stream
This is a library that transforms a what-wg EventDispatcher
into readable Streams.
Hey!
I like your article very much.
If you like small wrapper around streams I can recommend a tiny lib we wrote:
https://www.npmjs.com/package/object-stream-tools
Cheers!
Looks useful thanks for sharing : )