HIGHLAND

The high-level streams library for Node.js and the browser.

$ npm install highland

Download Source code

Download Source code

Introduction

Re-thinking the JavaScript utility belt, Highland manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like Streams. You may be familiar with Promises, EventEmitters and callbacks, but moving between them is far from seamless. Thankfully, there exists a deeper abstraction which can free our code. By updating the tools we use on Arrays, and applying them to values distributed in time instead of space, we can discard plumbing and focus on the important things. With Highland, you can switch between synchronous and asynchronous data sources at will, without having to re-write your code. Time to dive in!

Made by @caolan, with help and patience from friends - Leave a tip or fork this :)

Examples

Converting to/from Highland Streams

_([1,2,3,4]).toArray(function (xs) {
    // xs is [1,2,3,4]
});

Mapping over a Stream

var doubled = _([1,2,3,4]).map(function (x) {
    return x * 2;
});

Reading files in parallel (4 at once)

var data = _(filenames).map(readFile).parallel(4);

Handling errors

data.errors(function (err, rethrow) {
    // handle or rethrow error
});

Piping to a Node Stream

data.pipe(output);

Piping in data from Node Streams

var output = fs.createWriteStream('output');
var docs = db.createReadStream();

// wrap a node stream and pipe to file
_(docs).filter(isBlogpost).pipe(output);

// or, pipe in a node stream directly:
var through = _.pipeline(_.filter(isBlogpost));
docs.pipe(through).pipe(output);

Handling events

var clicks = _('click', btn).map(1);
var counter = clicks.scan(0, _.add);

counter.each(function (n) {
    $('#count').text(n);
});

Arrays

To work with data in Arrays, just wrap it in _(). The Highland methods are then available on it:

var shouty = _(['foo', 'bar', 'baz']).map(toUpperCase);

These methods return Stream objects, not Arrays, so you can chain together method calls:

_(['foo', 'bar', 'baz']).map(toUpperCase).map(function (x) {
    return {name: x};
});

When using the Highland APIs there is little reason to turn this back into an Array, but if you're calling an outside library you may need to convert it back:

_(['foo', 'bar', 'baz']).map(toUpperCase).toArray(function (xs) {
    // xs will now be ['FOO', 'BAR', 'BAZ]
});

Passing a function to the toArray call may seem a little unfamiliar, but this enables an important trick in Highland. Now, without changing any of your existing code, you could swap out ['foo', 'bar', 'baz'] for an asynchronous data source, and it would just work!

You can also pass Arrays into the top-level functions instead of using methods on the Stream object:

_.map(doubled, [1, 2, 3, 4])  // => 2 4 6 8

Note, this still returns a Stream.

Async

Now, let's see how we might swap out an Array source for an asynchronous one. By passing a function to the Stream constructor we can manually push values onto the Stream:

function getData(filename) {
    // create a new Stream
    return _(function (push, next) {
        // do something async when we read from the Stream
        fs.readFile(filename, function (err, data) {
            push(err, data);
            push(null, _.nil);
        });
    });
};

First, we return a new Stream which when read from will read a file (this is called lazy evaluation). When fs.readFile calls it's callback, we push the error and data values onto the Stream. Finally, we push _.nil onto the Stream. This is the "end of stream" marker and will tell any consumers of this stream to stop reading.

Since wrapping a callback is a fairly common thing to do, there is a convenience function:

var getData = _.wrapCallback(fs.readFile);

Now we have a new asynchronous source, we can run the exact same code from the Array examples on it:

getData('myfile').map(toUpperCase).map(function (x) {
    return {name: x};
});

With Highland, we really can have one language to work with both synchronous and asynchronous data, whether it's from a Node Stream, an EventEmitter, a callback or an Array. You can even wrap ES6 or jQuery promises:

var foo = _($.getJSON('/api/foo'));

Laziness

When you call map in Highland, it doesn't go off and immediately map over all your data. Rather it defines your intention, and the hard work occurs as you pull data from the Stream. This is 'lazy evaluation' and it's what enables Highland to manage back-pressure and also the sequencing of asynchronous actions, such as reading from a file.

var calls = 0;

var nums = _(['1', '2', '3']).map(function (x) {
    calls++;
    return Number(x);
});

// calls === 0

To get the map iterator to be called, we must pull some data from the Stream. This is called a thunk, and some Highland methods will cause them (eg, each, toArray, pipe, resume).

nums.each(function (n) { console.log(n); });

// calls === 3

Equally, when we tell Highland to map a Stream of filenames to the readFile function, it doesn't actually go and read all the files at once, it let's us decide on how we want to read them:

filenames.map(readFile).series();
filenames.map(readFile).parallel(10)

Back-pressure

Since Highland is designed to play nicely with Node Streams, it also support back-pressure. This means that a fast source will not overwhelm a slow consumer.

fastSource.map(slowThing)

In the above example, fastSource will be paused while slowThing does it's processing.

Some streams (such as those based on events) cannot be paused. In these cases data is buffered util the consumer is ready to handle it. If you expect a non-pausable source to be consumed by a slow consumer, then you should use methods such as throttle or latest to selectively drop data and regulate the flow.

Occasionally, you'll need to split Streams in your program. At this point, Highland will force you to choose between sharing back-pressure with the new consumer, or letting the existing consumer regulate backpressure and have the new consumer simply observe values as they arrive. Attempting to add two consumers to a Stream without calling fork or observe will throw an error.

// shared back-pressure
source.map(output1);
source.fork().map(output2);

// let the first handle backpressure and the second simply observe
source.map(output);
source.observe().map(_.log);

Currying

As well as calling functions as methods on the Stream object, Highland also exports them at the top-level.

mystream.map(doubled)
// is equivalent to
_.map(doubled, mystream)

By convention, all top-level functions are 'curryable', meaning you can partially apply their arguments. In the above example, this could be called as:

_.map(doubled)(mystream);

In real-world use, this means you can define the behaviour you'd like before knowing what Stream you'd like to perform it on:

// partially apply the filter() function to create a new function
var getBlogposts = _.filter(function (doc) {
    return doc.type === 'blogpost';
});

// now we can use the new function by completing it's arguments
getBlogposts(data); // => new Stream of blogposts

You can curry your own functions too:

var myCurryableFn = _.curry(fn);

Events

Streams can used to handle events as well as data, control-flow and error propagation. This is often a convenient way to filter and combine events into groups, a common goal on dynamically updated sites.

var inbox = _('message', client).where({recipient: 'me'});

If you expect to receive a lot of events, and perform an async process on each of them, then you should sample the events instead of buffering all of them.

// get a frequent event source
var text = _('keyup', $('#searchbox'));

// Regulate event stream:
// - wait until no keyup events for 1s
// - when read from, only return the latest value
var searches = text.debounce(1000).latest();

// map the search events to an AJAX request
var results = searches.map(searchRequest);

// for each response, display it
results.each(function (result) {
    // display result
});

Stream Objects

_(source)

The Stream constructor, accepts an array of values or a generator function as an optional argument. This is typically the entry point to the Highland APIs, providing a convenient way of chaining calls together.

Arrays - Streams created from Arrays will emit each value of the Array and then emit a nil value to signal the end of the Stream.

Generators - These are functions which provide values for the Stream. They are lazy and can be infinite, they can also be asynchronous (for example, making a HTTP request). You emit values on the Stream by calling push(err, val), much like a standard Node.js callback. You call next() to signal you've finished processing the current data. If the Stream is still being consumed the generator function will then be called again.

You can also redirect a generator Stream by passing a new source Stream to read from to next. For example: next(other_stream) - then any subsequent calls will be made to the new source.

Node Readable Stream - Pass in a Node Readable Stream object to wrap it with the Highland API. Reading from the resulting Highland Stream will begin piping the data from the Node Stream to the Highland Stream.

EventEmitter / jQuery Elements - Pass in both an event name and an event emitter as the two arguments to the constructor and the first argument emitted to the event handler will be written to the new Stream.

You can also pass as an optional third parameter a function, an array of strings or a number. In this case the event handler will try to wrap the arguments emitted to it and write this object to the new stream.

Promise - Accepts an ES6 / jQuery style promise and returns a Highland Stream which will emit a single value (or an error).

Parameters
  • source - Array | Function | Readable Stream | Promise - (optional) source to take values from from
// from an Array
_([1, 2, 3, 4]);

// using a generator function
_(function (push, next) {
    push(null, 1);
    push(err);
    next();
});

// a stream with no source, can pipe node streams through it etc.
var through = _();

// wrapping a Node Readable Stream so you can easily manipulate it
_(readable).filter(hasSomething).pipe(writeable);

// creating a stream from events
_('click', btn).each(handleEvent);

// creating a stream from events with mapping
_('request', httpServer, ['req', 'res']).each(handleEvent);

// from a Promise object
var foo = _($.getJSON('/api/foo'));
Stream.destroy()

Destroys a stream by unlinking it from any consumers and sources. This will stop all consumers from receiving events from this stream and removes this stream as a consumer of any source stream.

This function calls end() on the stream and unlinks it from any piped-to streams.

Stream.end()

Ends a Stream. This is the same as sending a nil value as data. You shouldn't need to call this directly, rather it will be called by any Node Readable Streams you pipe in.

mystream.end();
Stream.pause()

Pauses the stream. All Highland Streams start in the paused state.

var xs = _(generator);
xs.pause();
Stream.resume()

Resumes a paused Stream. This will either read from the Stream's incoming buffer or request more data from an upstream source.

var xs = _(generator);
xs.resume();
Stream.write(x)

Writes a value to the Stream. If the Stream is paused it will go into the Stream's incoming buffer, otherwise it will be immediately processed and sent to the Stream's consumers (if any). Returns false if the Stream is paused, true otherwise. This lets Node's pipe method handle back-pressure.

You shouldn't need to call this yourself, but it may be called by Node functions which treat Highland Streams as a Node Writable Stream.

Parameters
  • x - the value to write to the Stream
var xs = _();
xs.write(1);
xs.write(2);
xs.end();

xs.toArray(function (ys) {
    // ys will be [1, 2]
});

Transforms

Stream.append(y)

Adds a value to the end of a Stream.

Parameters
  • y - the value to append to the Stream
_([1, 2, 3]).append(4)  // => 1, 2, 3, 4
Stream.batch(n)

Takes one Stream and batches incoming data into arrays of given length

Parameters
  • n - Number - length of the array to batch
_([1, 2, 3, 4, 5]).batch(2)  // => [1, 2], [3, 4], [5]
Stream.collect()

Groups all values into an Array and passes down the stream as a single data event. This is a bit like doing toArray, but instead of accepting a callback and causing a thunk, it passes the value on.

_(['foo', 'bar']).collect().toArray(function (xs) {
    // xs will be [['foo', 'bar']]
});
Stream.compact()

Filters a Stream to drop all non-truthy values.

var compacted = _([0, 1, false, 3, null, undefined, 6]).compact();
// => [1, 3, 6]
Stream.consume(f)

Consumes values from a Stream (once resumed) and returns a new Stream for you to optionally push values onto using the provided push / next functions.

This function forms the basis of many higher-level Stream operations. It will not cause a paused stream to immediately resume, but behaves more like a 'through' stream, handling values as they are read.

Parameters
  • f - Function - the function to handle errors and values
var filter = function (f, source) {
    return source.consume(function (err, x, push, next) {
        if (err) {
            // pass errors along the stream and consume next value
            push(err);
            next();
        }
        else if (x === _.nil) {
            // pass nil (end event) along the stream
            push(null, x);
        }
        else {
            // pass on the value only if the value passes the predicate
            if (f(x)) {
                push(null, x);
            }
            next();
        }
    });
};
Stream.debounce(ms)

Holds off pushing data events downstream until there has been no more data for ms milliseconds. Sends the last value that occurred before the delay, discarding all other values.

Parameters
  • ms - Number - the milliseconds to wait before sending data
// sends last keyup event after user has stopped typing for 1 second
$('keyup', textbox).debounce(1000);
Stream.doto(f)

Creates a new Stream which applies a function to each value from the source and re-emits the source value. Useful when you want to mutate the value or perform side effects

Parameters
  • f - the function to apply
var appended = _([[1], [2], [3], [4]]).doto(function (x) {
    x.push(1);
});

_([1, 2, 3]).doto(console.log)
// 1
// 2
// 3
// => 1, 2, 3
Stream.errors(f)

Extracts errors from a Stream and applies them to an error handler function. Returns a new Stream with the errors removed (unless the error handler chooses to rethrow them using push). Errors can also be transformed and put back onto the Stream as values.

Parameters
  • f - Function - the function to pass all errors to
getDocument.errors(function (err, push) {
    if (err.statusCode === 404) {
        // not found, return empty doc
        push(null, {});
    }
    else {
        // otherwise, re-throw the error
        push(err);
    }
});
Stream.filter(f)

Creates a new Stream including only the values which pass a truth test.

Parameters
  • f - the truth test function
var evens = _([1, 2, 3, 4]).filter(function (x) {
    return x % 2 === 0;
});
Stream.find(f)

A convenient form of filter, which returns the first object from a Stream that passes the provided truth test

Parameters
  • f - Function - the truth test function which returns a Stream
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

var f = function (x) {
    return x.type == 'blogpost';
};

_(docs).find(f);
// => [{type: 'blogpost', title: 'foo'}]

// example with partial application
var firstBlogpost = _.find(f);

firstBlogpost(docs)
// => [{type: 'blogpost', title: 'foo'}]
Stream.group(f)

A convenient form of reduce, which groups items based on a function or property name

Parameters
  • f - Function|String - the function or property name on which to group, toString() is called on the result of a function.
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

var f = function (x) {
    return x.type;
};

_(docs).group(f); OR _(docs).group('type');
// => {
// =>    'blogpost': [{type: 'blogpost', title: 'foo'}, {type: 'blogpost', title: 'bar'}]
// =>    'comment': [{type: 'comment', title: 'foo'}]
// =>  }
Stream.invoke(method, args)

Calls a named method on each object from the Stream - returning a new stream with the result of those calls.

Parameters
  • method - String - the method name to call
  • args - Array - the arguments to call the method with
_(['foo', 'bar']).invoke('toUpperCase', [])  // => FOO, BAR

filenames.map(readFile).sequence().invoke('toString', ['utf8']);
Stream.last()

Drops all values from the Stream apart from the last one (if any).

_([1, 2, 3, 4]).last()  // => 4
Stream.latest()

Creates a new Stream, which when read from, only returns the last seen value from the source. The source stream does not experience back-pressure. Useful if you're using a Stream to model a changing property which you need to query periodically.

// slowThing will always get the last known mouse position
// when it asks for more data from the mousePosition stream
mousePosition.latest().map(slowThing)
Stream.map(f)

Creates a new Stream of transformed values by applying a function to each value from the source. The transformation function can be replaced with a non-function value for convenience, and it will emit that value for every data event on the source Stream.

Parameters
  • f - the transformation function or value to map to
var doubled = _([1, 2, 3, 4]).map(function (x) {
    return x * 2;
});

_([1, 2, 3]).map('hi')  // => 'hi', 'hi', 'hi'
Stream.pluck(property)

Retrieves values associated with a given property from all elements in the collection.

Parameters
  • prop - String - the property to which values should be associated
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'baz'}
];

_(docs).pluck('title').toArray(function (xs) {
   // xs is now ['foo', 'bar', 'baz']
});
Stream.reduce(memo, iterator)

Boils down a Stream to a single value. The memo is the initial state of the reduction, and each successive step of it should be returned by the iterator function. The iterator is passed two arguments: the memo and the next value.

If the iterator throws an error, the reduction stops and the resulting stream will emit that error instead of a value.

Parameters
  • memo - the initial state of the reduction
  • iterator - Function - the function which reduces the values
var add = function (a, b) {
    return a + b;
};

_([1, 2, 3, 4]).reduce(0, add)  // => 10
Stream.reduce1(iterator)

Same as reduce, but uses the first element as the initial state instead of passing in a memo value.

Parameters
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).reduce1(add)  // => 10
Stream.reject(f)

The inverse of filter.

Parameters
  • f - Function - the truth test function
var odds = _([1, 2, 3, 4]).reject(function (x) {
    return x % 2 === 0;
});
Stream.scan(memo, iterator)

Like reduce, but emits each intermediate value of the reduction as it is calculated.

If the iterator throws an error, the scan will stop and the stream will emit that error. Any intermediate values that were produced before the error will still be emitted.

Parameters
  • memo - the initial state of the reduction
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).scan(0, add)  // => 0, 1, 3, 6, 10
Stream.scan1(iterator)

Same as scan, but uses the first element as the initial state instead of passing in a memo value.

Parameters
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).scan1(add)  // => 1, 3, 6, 10
Stream.stopOnError(f)

Like the errors method, but emits a Stream end marker after an Error is encountered.

Parameters
  • f - Function - the function to handle an error
brokenStream.stopOnError(function (err) {
    console.error('Something broke: ' + err);
});
Stream.take(n)

Creates a new Stream with the first n values from the source.

Parameters
  • n - Number - integer representing number of values to read from source
_([1, 2, 3, 4]).take(2) // => 1, 2
Stream.throttle(ms)

Ensures that only one data event is push downstream (or into the buffer) every ms milliseconds, any other values are dropped.

Parameters
  • ms - Number - the minimum milliseconds between each value
_('mousemove', document).throttle(1000);
Stream.where(props)

A convenient form of filter, which returns all objects from a Stream match a set of property values.

Parameters
  • props - Object - the properties to match against
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

_(docs).where({title: 'foo'})
// => {type: 'blogpost', title: 'foo'}
// => {type: 'comment', title: 'foo'}

// example with partial application
var getBlogposts = _.where({type: 'blogpost'});

getBlogposts(docs)
// => {type: 'blogpost', title: 'foo'}
// => {type: 'blogpost', title: 'bar'}

Higher-order Streams

Stream.concat(ys)

Concatenates a Stream to the end of this Stream.

Be aware that in the top-level export, the args may be in the reverse order to what you'd expect _([a], [b]) => [b, a], as this follows the convention of other top-level exported functions which do x to y.

Parameters
_([1, 2]).concat([3, 4])  // => 1, 2, 3, 4
_.concat([3, 4], [1, 2])  // => 1, 2, 3, 4
Stream.flatFilter(f)

Filters using a predicate which returns a Stream. If you need to check against an asynchronous data source when filtering a Stream, this can be convenient. The Stream returned from the filter function should have a Boolean as it's first value (all other values on the Stream will be disregarded).

Parameters
  • f - Function - the truth test function which returns a Stream
var checkExists = _.wrapCallback(fs.exists);
filenames.flatFilter(checkExists)
Stream.flatMap(f)

Creates a new Stream of values by applying each item in a Stream to an iterator function which must return a (possibly empty) Stream. Each item on these result Streams are then emitted on a single output Stream.

Parameters
  • f - Function - the iterator function
filenames.flatMap(readFile)
Stream.flatten()

Recursively reads values from a Stream which may contain nested Streams or Arrays. As values or errors are encountered, they are emitted on a single output Stream.

_([1, [2, 3], [[4]]]).flatten();  // => 1, 2, 3, 4

var nums = _(
    _([1, 2, 3]),
    _([4, _([5, 6]) ])
);

nums.flatten();  // => 1, 2, 3, 4, 5, 6
Stream.fork()

Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from it's source as fast as the slowest consumer can handle them.

var xs = _([1, 2, 3, 4]);
var ys = xs.fork();
var zs = xs.fork();

// no values will be pulled from xs until zs also resume
ys.resume();

// now both ys and zs will get values from xs
zs.resume();
Stream.merge()

Takes a Stream of Streams and merges their values and errors into a single new Stream. The merged stream ends when all source streams have ended.

Note that no guarantee is made with respect to the order in which values for each stream end up in the merged stream. Values in the merged stream will, however, respect the order they were emitted from their respective streams.

var txt = _(['foo.txt', 'bar.txt']).map(readFile)
var md = _(['baz.md']).map(readFile)

_([txt, md]).merge();
// => contents of foo.txt, bar.txt and baz.txt in the order they were read
Stream.observe()

Observes a stream, allowing you to handle values as they are emitted, without adding back-pressure or causing data to be pulled from the source. This can be useful when you are performing two related queries on a stream where one would block the other. Just be aware that a slow observer could fill up it's buffer and cause memory issues. Where possible, you should use fork.

var xs = _([1, 2, 3, 4]);
var ys = xs.fork();
var zs = xs.observe();

// now both zs and ys will recieve data as fast as ys can handle it
ys.resume();
Stream.otherwise(ys)

Switches source to an alternate Stream if the current Stream is empty.

Parameters
  • ys - Stream - alternate stream to use if this stream is empty
_([1,2,3]).otherwise(['foo'])  // => 1, 2, 3
_([]).otherwise(['foo'])       // => 'foo'

_.otherwise(_(['foo']), _([1,2,3]))    // => 1, 2, 3
_.otherwise(_(['foo']), _([]))         // => 'foo'
Stream.parallel(n)

Takes a Stream of Streams and reads from them in parallel, buffering the results until they can be returned to the consumer in their original order.

Parameters
  • n - Number - the maximum number of concurrent reads/buffers
var readFile = _.wrapCallback(fs.readFile);
var filenames = _(['foo.txt', 'bar.txt', 'baz.txt']);

// read from up to 10 files at once
filenames.map(readFile).parallel(10);
_.pipeline(...)

Creates a 'Through Stream', which passes data through a pipeline of functions or other through Streams. This is particularly useful when combined with partial application of Highland functions to expose a Node-compatible Through Stream.

This is not a method on a Stream, and it only exposed at the top-level as _.pipeline. It takes an arbitrary number of arguments.

var through = _.pipeline(
    _.map(parseJSON),
    _.filter(isBlogpost),
    _.reduce(collectCategories)
    _.through(otherPipeline)
);

readStream.pipe(through).pipe(outStream);

// Alternatively, you can use pipeline to manipulate a stream in
// the chained method call style:

var through2 = _.pipeline(function (s) {
    return s.map(parseJSON).filter(isBlogpost); // etc.
});
Stream.sequence()

Reads values from a Stream of Streams, emitting them on a Single output Stream. This can be thought of as a flatten, just one level deep. Often used for resolving asynchronous actions such as a HTTP request or reading a file.

var nums = _([
    _([1, 2, 3]),
    _([4, 5, 6])
]);

nums.sequence()  // => 1, 2, 3, 4, 5, 6

// using sequence to read from files in series
filenames.map(readFile).sequence()
Stream.series()

An alias for the sequence method.

filenames.map(readFile).series()
Stream.through(target)

Passes the current Stream to a function, returning the result. Can also be used to pipe the current Stream through another Stream. It will always return a Highland Stream (instead of the piped to target directly as in Node.js).

function oddDoubler(s) {
    return s.filter(function (x) {
        return x % 2; // odd numbers only
    })
    .map(function (x) {
        return x * 2;
    });
}

_([1, 2, 3, 4]).through(oddDoubler).toArray(function (xs) {
    // xs will be [2, 6]
});

// Can also be used with Node Through Streams
_(filenames).through(jsonParser).map(function (obj) {
    // ...
});
Stream.zip(ys)

Takes two Streams and returns a Stream of corresponding pairs.

Parameters
  • ys - Array | Stream - the other stream to combine values with
_(['a', 'b', 'c']).zip([1, 2, 3])  // => ['a', 1], ['b', 2], ['c', 3]

Consumption

Stream.apply(f)

Applies results from a Stream as arguments to a function

Parameters
  • f - Function - the function to apply arguments to
_([1, 2, 3]).apply(function (a, b, c) {
    // a === 1
    // b === 2
    // c === 3
});
Stream.each(f)

Iterates over every value from the Stream, calling the iterator function on each of them. This function causes a thunk.

If an error from the Stream reaches the each call, it will emit an error event (which will cause it to throw if unhandled).

Parameters
  • f - Function - the iterator function
_([1, 2, 3, 4]).each(function (x) {
    // will be called 4 times with x being 1, 2, 3 and 4
});
Stream.pipe(dest)

Pipes a Highland Stream to a Node Writable Stream (Highland Streams are also Node Writable Streams). This will pull all the data from the source Highland Stream and write it to the destination, automatically managing flow so that the destination is not overwhelmed by a fast source.

This function returns the destination so you can chain together pipe calls.

Parameters
  • dest - Writable Stream - the destination to write all data to
var source = _(generator);
var dest = fs.createWriteStream('myfile.txt')
source.pipe(dest);

// chained call
source.pipe(through).pipe(dest);
Stream.pull(f)

Consumes a single item from the Stream. Unlike consume, this function will not provide a new stream for you to push values onto, and it will unsubscribe as soon as it has a single error, value or nil from the source.

You probably won't need to use this directly, but it is used internally by some functions in the Highland library.

Parameters
  • f - Function - the function to handle data
xs.pull(function (err, x) {
    // do something
});
Stream.toArray(f)

Collects all values from a Stream into an Array and calls a function with once with the result. This function causes a thunk.

If an error from the Stream reaches the toArray call, it will emit an error event (which will cause it to throw if unhandled).

Parameters
  • f - Function - the callback to provide the completed Array to
_([1, 2, 3, 4]).toArray(function (x) {
    // parameter x will be [1,2,3,4]
});

Utils

_.isStream(x)

Returns true if x is a Highland Stream.

Parameters
  • x - the object to test
_.isStream('foo')  // => false
_.isStream(_([1,2,3]))  // => true
_.log(args..)

Logs values to the console, a simple wrapper around console.log that it suitable for passing to other functions by reference without having to call bind.

_.log('Hello, world!');

_([1, 2, 3, 4]).each(_.log);

The end of stream marker. This is sent along the data channel of a Stream to tell consumers that the Stream has ended. See the example map code for an example of detecting the end of a Stream.

Note: nil is setup as a global where possible. This makes it convenient to access, but more importantly lets Streams from different Highland instances work together and detect end-of-stream properly. This is mostly useful for NPM where you may have many different Highland versions installed.

var map = function (iter, source) {
    return source.consume(function (err, val, push, next) {
        if (err) {
            push(err);
            next();
        }
        else if (val === _.nil) {
            push(null, val);
        }
        else {
            push(null, iter(val));
            next();
        }
    });
};
_.wrapCallback(f)

Wraps a node-style async function which accepts a callback, transforming it to a function which accepts the same arguments minus the callback and returns a Highland Stream instead. Only the first argument to the callback (or an error) will be pushed onto the Stream.

Parameters
  • f - Function - the node-style function to wrap
var fs = require('fs');

var readFile = _.wrapCallback(fs.readFile);

readFile('example.txt').apply(function (data) {
    // data is now the contents of example.txt
});

Objects

_.extend(a, b)

Extends one object with the properties of another. Note: The arguments are in the reverse order of other libraries such as underscore. This is so it follows the convention of other functions in this library and so you can more meaningfully partially apply it.

Parameters
  • a - Object - the properties to extend b with
  • b - Object - the original object to extend
_.extend({name: 'bar'}, {name: 'foo', price: 20})
// => {name: 'bar', price: 20}

// example of partial application
var publish = _.extend({published: true});

publish({title: 'test post'})
// => {title: 'test post', published: true}
_.get(prop, obj)

Returns a property from an object.

Parameters
  • prop - String - the property to return
  • obj - Object - the object to read properties from
var obj = {foo: 'bar', baz: 123};
_.get('foo', obj) // => 'bar'

// making use of partial application
var posts = [
  {title: 'one'},
  {title: 'two'},
  {title: 'three'}
];

_(posts).map(_.get('title'))  // => 'one', 'two', 'three'
_.keys(obj)

Returns keys from an Object as a Stream.

Parameters
  • obj - Object - the object to return keys from
_.keys({foo: 1, bar: 2, baz: 3})  // => 'foo', 'bar', 'baz'
_.pairs(obj)

Returns key/value pairs for an Object as a Stream. Reads properties lazily, so if you don't read from all keys on an object, not all properties will be read from (may have an effect where getters are used).

Parameters
  • obj - Object - the object to return key/value pairs from
_.pairs({foo: 1, bar: 2})  // => ['foo', 1], ['bar', 2]
_.set(prop, value, obj)

Updates a property on an object, returning the updated object.

Parameters
  • prop - String - the property to return
  • value - the value to set the property to
  • obj - Object - the object to set properties on
var obj = {foo: 'bar', baz: 123};
_.set('foo', 'wheeee', obj) // => {foo: 'wheeee', baz: 123}

// making use of partial application
var publish = _.set('published', true);

publish({title: 'example'})  // => {title: 'example', published: true}
_.values(obj)

Returns values from an Object as a Stream. Reads properties lazily, so if you don't read from all keys on an object, not all properties will be read from (may have an effect where getters are used).

Parameters
  • obj - Object - the object to return values from
_.values({foo: 1, bar: 2, baz: 3})  // => 1, 2, 3

Functions

compose(fn1, fn2, ...)

Creates a composite function, which is the application of function1 to the results of function2. You can pass an arbitrary number of arguments and have them composed. This means you can't partially apply the compose function itself.

var add1 = add(1);
var mul3 = mul(3);

var add1mul3 = compose(mul3, add1);
add1mul3(2) == 9
curry(fn, [*arguments])

Transforms a function with specific arity (all arguments must be defined) in a way that it can be called as a chain of functions until the arguments list is saturated.

This function is not itself curryable.

Parameters
  • fn - Function - the function to curry
  • args.. - any number of arguments to pre-apply to the function
fn = curry(function (a, b, c) {
    return a + b + c;
});

fn(1)(2)(3) == fn(1, 2, 3)
fn(1, 2)(3) == fn(1, 2, 3)
fn(1)(2, 3) == fn(1, 2, 3)
flip(fn, [x, y])

Evaluates the function fn with the argument positions swapped. Only works with functions that accept two arguments.

Parameters
  • f - Function - function to flip argument application for
  • x - parameter to apply to the right hand side of f
  • y - parameter to apply to the left hand side of f
div(2, 4) == 0.5
flip(div, 2, 4) == 2
flip(div)(2, 4) == 2
ncurry(n, fn, [args...])

Same as curry but with a specific number of arguments. This can be useful when functions do not explicitly define all its parameters.

This function is not itself curryable.

Parameters
  • n - Number - the number of arguments to wait for before apply fn
  • fn - Function - the function to curry
  • args... - any number of arguments to pre-apply to the function
fn = ncurry(3, function () {
    return Array.prototype.join.call(arguments, '.');
});

fn(1, 2, 3) == '1.2.3';
fn(1, 2)(3) == '1.2.3';
fn(1)(2)(3) == '1.2.3';
partial(fn, args...)

Partially applies the function (regardless of whether it has had curry called on it). This will always postpone execution until at least the next call of the partially applied function.

Parameters
  • fn - Function - function to partial apply
  • args... - the arguments to apply to the function
var addAll = function () {
    var args = Array.prototype.slice.call(arguments);
    return foldl1(add, args);
};
var f = partial(addAll, 1, 2);
f(3, 4) == 10
seq(fn1, fn2, ...)

The reversed version of compose. Where arguments are in the order of application.

var add1 = add(1);
var mul3 = mul(3);

var add1mul3 = seq(add1, mul3);
add1mul3(2) == 9

Operators

_.add(a, b)

Add two values. Can be partially applied.

add(1, 2) === 3
add(1)(5) === 6
_.not(x)

Perform logical negation on a value. If x is truthy then returns false, otherwise returns true.

Parameters
  • x - the value to negate
_.not(true)   // => false
_.not(false)  // => true