var Readable = require('readable-stream/readable'); var util = require('util'); function isReadable(stream) { if (typeof stream.pipe !== 'function') { return false; } if (!stream.readable) { return false; } if (typeof stream._read !== 'function') { return false; } if (!stream._readableState) { return false; } return true; } function addStream (streams, stream) { if (!isReadable(stream)) { throw new Error('All input streams must be readable'); } var self = this; stream._buffer = []; stream.on('readable', function () { var chunk = stream.read(); while (chunk) { if (this === streams[0]) { self.push(chunk); } else { this._buffer.push(chunk); } chunk = stream.read(); } }); stream.on('end', function () { for (var stream = streams[0]; stream && stream._readableState.ended; stream = streams[0]) { while (stream._buffer.length) { self.push(stream._buffer.shift()); } streams.shift(); } if (!streams.length) { self.push(null); } }); stream.on('error', this.emit.bind(this, 'error')); streams.push(stream); } function OrderedStreams (streams, options) { if (!(this instanceof(OrderedStreams))) { return new OrderedStreams(streams, options); } streams = streams || []; options = options || {}; options.objectMode = true; Readable.call(this, options); if (!Array.isArray(streams)) { streams = [streams]; } if (!streams.length) { return this.push(null); // no streams, close } var addStreamBinded = addStream.bind(this, []); streams.forEach(function (item) { if (Array.isArray(item)) { item.forEach(addStreamBinded); } else { addStreamBinded(item); } }); } util.inherits(OrderedStreams, Readable); OrderedStreams.prototype._read = function () {}; module.exports = OrderedStreams;