This package brings goodies of functional programming (map, filter, reduce) to node streams.
npm install --save object-stream-toolsConverts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable) Prints
[{foo: 'bar'}, {web: 'scale'}]Its very useful if you want to get unique elements / set of values
const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.streamToSet())
.on('data', uniqueSet => {
// here one get array of unique elements
const uniqueArray = Array.from(uniqueSet.values()).sort()
})If you just want to remove some objects from stream, you probably want to use filter function.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6))
// here you will get filtered objects
.pipe(jsonStream.stringify())
.pipe(process.stdout)Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but you dont want to load whole stream to memory.
Example: sum / average value of huge stream
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
// pick required property you want to reduce over
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue => {
// here you will get reduced value
})Here is example with buffered/string input output:
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue =>
// here you will get reduced value
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode. Objects/Array/Reduce
Very handy when you want to aggregate streams using reduce or derrivated calls. Keep in mind .promise() will not work if you use only ost.map or ost.reduce features - as they do not aggregate.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.streamToArray())
.promise()
.then(data => {
// here you will get your aggregated data - array of values.
})Find is super handy if we want to quickly check if vale/objects exists in the stream. Think about it as a grep on the steroids.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.find(e => e.value > 6))
.then(foundObj => {
// here you will get found first object that matches condition
// or undefined when there were none that matches
})