Let the queue flow through you, Skywalker.
queue-flow
is a library for Javascript source code organization built on the concepts of queues, flowcharts, and functional programming. This tutorial will introduce these concepts, explain how they are combined together to produce a useful whole, and then the API and syntax will be introduced over time while discussing various patterns in synchronous code and how they are replicated with queue-flow
.
For those who don’t speak British English (myself included), “queues” may not be high in your vocabulary. Fortunately, a queue in computer science is very much like the standard definition.
Think of a line in a supermarket. The first person in the line is the first to be served, the second person is the second, and so on. First In, First Out
But when will your turn come up? If there is only one register in front of your line/queue, then you have to wait for every person in front of you to be served entirely. But if there are four registers, you will be served by whichever of the four registers finishes with their client when you are at the front of the line.
This can be translated directly to software queues. Items (clients) are queued up, and workers (registers) dequeue and process (serve) the items.
Queues can also lead into other queues, much like a factory floor for vehicles. In this situation, a piece of data is processed on by one worker, then passed into another queue to be worked on by the next, etc. In an ideal situation, there would be space for only one item in a queue, like the conveyor belt on the factory floor, as each step is broken into roughly equal amounts of processing time necessary, and all of the workers are kept equally busy.
Tasks that take a longer amount of time could be done in parallel by a few workers to get the average time to equal the time needed by the faster tasks, when trying to optimize, and if each item involved is independent of the next, as with grocery checkout or car-making, then this is a valid step to take.
Later on, if demand starts to exceed the throughput of this tuned queue, but the processing time of any one item is tolerable, performance can be increased by simply duplicating the tuned queue as many times as necessary for the demand.
Chained queues with processing broken across many small pieces like describe above allows for that sort of performance optimization with little change to the process (code) involved. It worked for Ford, it can work for your code, too!
But much software isn’t doing the same things over and over, like a car factory. It has to adapt to what it’s users want, and depending on what is needed, different things may be necessary to be done. Shoehorning that into a single, linear queue is only asking for trouble.
Flowcharts are a basic, high-level descriptions of things to do, with simple sentences and questions denoting things to do and decide on. The actual process is an implementation detail. Imperative flow control is directly translatable from flow charts, with functions serving as the implementation detail. With classic functions and imperative flow control, though, only one part of the flow chart is in use at a time, and each use of the flow chart must wait for the prior use to complete.
Queues that solve a particular action or decision in a flowchart could just as easily be substituted in for functions, and at the end of each queue, the result could be pushed onto the next queue, which is either pre-determined (a linear flow), or decided upon based on some criteria about the data in question (a branch), but with queues, one piece of data could be half-way through the flowchart built through the queues while another piece is just starting and another is finishing. There could even be another piece working on the exact same step if there are multiple workers.
Everything said before about the advantages of queues can apply to a flowchart built via queues, a queue flow, and source code organization can be roughly the same as imperative-style code.
So if this is so awesome, why isn’t it more commonly in use? Essentially, in order to implement the queues that handle sync and async calls identically, queue-flow
uses functional programming concepts for implementing the steps, and functional programming requires more discipline than imperative programming.
What is functional programming? Programming with functions, of course! But that tautology doesn’t really explain things. In functional programming languages, your language constructs, all of them, are functions. Conditions, loops, etc, and manipulation of functions to produce new functions is the order of the day. You know “higher order functions” as functions that take a callback function. Javascript’s bind
implements the “partial function” concept that sets one or more of the first few arguments of a function and returns a function that takes the remaining arguments. The Javascript Array
object now has several methods for functional data processing – methods that take an array and a function to perform an operation on each element of the array according to a particular set of rules. forEach, map, reduce, filter, every, some
these are the ECMAScript 5 methods added to Array
that are most closely analogous to queue-flow
.
With these methods, the entire array, the entire set of values to operate on, are already known, so these methods are all synchronous. Since they are synchronous only, there is no penalty for completely finishing the processing of one operation before starting the next. Since most of them return arrays (excepting filter, every, some
), they can be chained to each other.
Because each step is a separate function, they share the same outer scope, but they cannot affect each others’ inner scopes, so their callbacks are pushed to be more “pure” and depend only on their own definition and the particular value in the array they are currently operating on.
queue-flow
tries to mimic the ECMAScript 5 Array processing methods in syntax as much as possible. A queue, conceptually is very similar to an array. With arrays, all of the values are queued up-front and then picked off one-at-a-time by their processing method. The assumption that because the array is loaded synchronously in the code should not have been applied to processing methods’ callbacks. What if you want to take an array of URLs, query them, and when finished load the images into your web app? You can’t use map
or reduce
on the array, because it will return an empty array (or nothing) since these requests need to pass through the implicit Javascript Event Loop waiting on the remote web server to return the image data.
And even if you could use the ECMAScript 5 methods in a piece of code, the moment you need to make a decision one way or another, you have to save the state of the array, do some imperative code to decide which branch of code to use, and then continue with the functional programming. If you have to use imperative code anyways, you’ll be less likely to change your habits and truly write small, re-usable data processing functions that fully house their own logic – your functions will just look like blocks of imperative code and be larger than necessary.
So, queue-flow
fixes a few things:
The syntax of queue-flow
has been intentionally kept close to the ECMAScript 5 Array processing methods syntax.
var result = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.map(function(val) {
return val*2;
})
.reduce(function(cum, cur) {
return cum + cur;
}, 0);
becomes
q([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.map(function(val) {
return val*2;
})
.reduce(function(cum, cur) {
return cum + cur;
}, function(result) {
...
}, 0);
Where the built-in Array methods operate directly on an array, with queue-flow
we must first construct a queue and operate on that. Where each of the Array methods return new Arrays when they complete their data processing, queue-flow
creates new queues immediately and pushes the results of the previous queue into the next queue as they become available. Where the Array methods assume their callbacks are synchronous, and are synchronous themselves, the result can be assigned inline, queue-flow
passes the results along to the next queue or callback (one of each in this example).
Now that we see how similar they can be, lets take a look at how far they can diverge. First of all, “finally” style callbacks, like in that reduce
, suck. Your continuing code is now nested a level deep but is not conceptually another O(n)
depth, it’s just what must come next, linearly. So lets remove it.
q([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.map(function(val) {
return val*2;
})
.reduce(function(cum, cur) {
return cum + cur;
}, 'nextQueue', 0);
So, we just sent the final result value into a queue called 'nextQueue'
. But how do we define that queue?
q('nextQueue')
.each(console.log);
Simple. Just tell queue-flow
you want that named queue and then start adding processing to it. This time we used each
, which simply passes the value into the provided callback function and then passes the original value onto the next queue, ignoring whatever the callback returns. And we just passed it a named function directly with no wrapper anonymous function, so it’s nicely readable. Only if you have several functions that you need to execute that don’t directly build on top of each other so(like(just(value)))
would you need to actually write a new function, because that same nested behavior can be accomplished:
q([value])
.map(just)
.map(like)
.map(so);
It may not seem like much of a difference, but:
so(like(just(value)))
will blow up in your face..each(console.log)
? This little line can help you easily debug async processes:q([value])
.each(console.log) // value
.map(just)
.each(console.log) // just(value)
.map(like)
.each(console.log) // like(just(value))
.map(so)
.each(console.log) // so(like(just(value)))
They can be inserted and removed easily whereever you’re having problems understanding the flow of code, but don’t require any modifications to the functions you’re calling.
To be an async function, all you have to do is accept one extra argument than the method you provide your callback to expects, so the original example could be written asynchronously like so:
q([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.map(function(val, callback) {
callback(val*2);
})
.reduce(function(cum, cur, callback) {
callback(cum + cur);
}, 'nextQueue', 0);
If your function doesn’t have explicit arguments, function() { ... }
, then queue-flow
assumes that it is a synchronous function. If this is not the case, you can wrap your function in q.async()
to force queue-flow
to treat it as an async function.
q('someQueue')
.map(q.async(function() {
var val = arguments[0];
var callback = arguments[1];
callback(val);
}));
This may seem pointless, but one of the most complex methods in queue-flow
, node
could provide your callback method with an unpredictable number of arguments, and therefore make manipulating the arguments
object directly necessary.
A method in queue-flow
, branch
, is designed to get different named queues to “work together”, and have no analogs with the ECMAScript 5 Array methods.
q('someQueue')
.map(someAction)
.branch('someOtherQueue');
q('someOtherQueue')
.each(console.log);
This usage of branch
is limited, but has some uses, such as rejoining a queue that branches into two queues for a few steps.
branch
can also take a function that it provides a queued value to, and expects it to return the name of the branch the value should be pushed to.
q('numbers')
.branch(function(val) {
return val > 100 ? 'big' : 'small';
});
q('big')
.each(function(val) {
console.log(val + ' is a BIG number!');
});
q('small')
.each(function(val) {
console.log('Here we see ' + val + ', a very small number, indeed!');
});
You can also pass an actual queue reference to branch (or have a function return that reference) rather than passing a name straight. This can be useful for creating “sub-queues” that sit in a function and are given the destination queue and return their source queue (that the outer queue can similarly branch to).
function subQueue(destination) {
var localQ = q.ns()();
localQ
.map(doSomething)
.filter(removeUndesireableValues)
.branch(destination);
return localQ;
}
q('someQueue')
.node(someApiCall)
.branch(subQueue(q('nextQueue')));
q('nextQueue')
.reduce(continueQueueStuff);
However, queue-flow
now has a built-in sub-queue mechanism that allows for a more fluent definition, and explicit branching is no longer necessary.
function subqueueFunc(parentQ) {
return parentQ
.map(doSomething)
.filter(removeUndesierableValues);
}
q('someQueue')
.node(someApiCall)
.subqueue(subqueueFunc)
.reduce(continueQueueStuff);
Beyond the actual data processing flow of your code, queue-flow
also has an event system that can be used for more robust monitoring of your data flow. Patterned after jQuery’s events:
q('someQueue')
.on('push', function(val) {
console.log('This is the value that was pushed in: ' + val);
return doSomethingWith(val);
});
Just like DOM
events, queue-flow
events can be cancelled by explicitly returning false (if you return nothing, it assumes true). You can register multiple event handlers for the same event, as well (event handlers called in the order they were registered, and if any event handler returns false, it immediately stops evaluating the handlers). And just like the callbacks you provide to the data processing methods of queue-flow
, the event handlers can be sync or async.
The different kinds of events for queue-flow
are: ‘push’, ‘pull’, ‘empty’, ‘close’, and ‘kill’.
.kill()
with caution!Finally, queue-flow
allows its constructor function to be overridden by another constructor function to allow different behaviors while keeping the queue-flow
syntax essentially unchanged. For instance, sloppy-queue-flow
, which keeps the syntax but isn’t actually a queue at all – values are passed through whenever they are completed, not in the order they were passed in – which is good for request-response applications (like web servers).
Using another constructor function is fairly simple.
q('someQueue', someConstructorFunction)
.map(someMappingFunction)
.filter(someFilteringFunction)
...
All of the anonymous sub-queues directly attached to the initial queue use the provided constructor function automatically. If you only want certain steps to have the constructor function, you can use branch
to accomplish that:
q('someQueue')
.map(someMappingFunction)
.branch('someSpecialQueue');
q('someSpecialQueue', someConstructorFunction)
.reduce(someReducingFunction, 'finalQueue', 0);
q('finalQueue')
.filter(someFilteringFunction)
...
queue-flow
can be used with any particular software pattern, and most of them concisely. Next we will go into a bit more depth on the various methods of queue-flow
and then how different patterns are acheived with it.
An each operation takes a set of input values, applies them one at a time to the callback function you provide, and then passes the original values on to the next queue. Nothing is done to any of the input data, but your code can perform some sort of side effect on the data. The signature of the callback function you provide should look like this: function(value, [callback])
. The first argument is each value, one at a time, being passed into the function. The second argument, if it exists, will be a callback function your function can call with no arguments to signal that your operation is complete.
queue-flow
does not care if the arguments have those exact names, they are just descriptive of what each argument is for. You could just as easily make the function signature function(val, next)
and it will work just fine.
A map operation takes a set of input values, applies the same transformation to all of them, and returns an equally-sized set of results. It is a transformation, a mapping, from one value type into another.
In queue-flow
, the function signature is: function(value, [callback])
. One argument if sync, two if async.
A filter operation takes a set of input values and returns only those values that pass a test, so a function provided to filter must return true
or false
on whether the value should be kept or not. The function signature is the same as map: function(value, [callback])
.
A reduce operation takes a set of input values, and optionally an initial condition value, and reduces the entire set of values into a single value. This is done by passing the current reduction of the set along with the newest value, and what the function returns is taken as the newest reduction of the set. The function signature is: function(reduction, value, [callback])
While map and filter are relatively easy to understand, reduce has some nuances. You can think of the reducing function as a mathematical operator that takes two values and produces one. For instance, addition: [1, 2, 3, 4, 5] => 1 + 2 + 3 + 4 + 5 = 15
. We can see that if we reversed the array, [5, 4, 3, 2, 1] => 5 + 4 + 3 + 2 + 1 = 15
, we get the same result in the end. But if we used division instead, 1 / 2 / 3 / 4 / 5 != 5 / 4 / 3 / 2 / 1
. Reduction order can matter, and if it does, you need to make sure your data source is in the correct order ahead of time.
Methods and operations that don’t care about order are called commutative, and there’s a particular power to these operations because they can be parallelized just like map.
The reduce method in queue-flow
takes three arguments, not just the single callback like map and filter. The first argument is the normal callback function run on every item in the set. The second argument is optional and can be one of two types: A final callback that takes a single argument with the result of the reduction, function(result)
, or it can be a string with the name of the queue you want the result to be passed into. If not provided reduce
creates an anonymous queue that can be chained, like usual. The third argument is also optional and is the initial value reduction
should have when calling your callback for the first time, and defaults to undefined
.
One final warning about reduce (and every method derived from reduce, they will be called out as specializations of reduce), the final callback or named queue will only be given the result once the queue is closed. Queues created with a fixed array, like q([1, 2, 3, 4, 5])
, close automatically, but named queues, like q('someQueue')
, do not, since they have no values enqueued on construction and would close before you could even use them, otherwise. You have to either explicitly call q('someQueue').close()
somewhere in your code when you are ready to get the final result of reduce (and reduce-like methods), or you have to explicitly call q('someQueue').closeOnEmpty()
after you have finished enqueueing all of the data you intend to process in that queue.
Queues that are closing close their “directly-connected” sub-queues, as well, so:
q('someQueue')
.map(someMap)
.filter(someFilter)
.reduce(someReduction, finalCallback, 0);
q('someQueue')
.push(val1, val2, val3)
.closeOnEmpty();
This will cause finalCallback
to eventually be called once ‘someQueue’ is empty, as then map will pass the close command on to filter, which will pass it on to reduce.
These two operations are specializations of reduce, although conceptually they are similar to filter. They take two arguments, a callback function to be run on every value (with caveats explained below) and a final function or named queue string to pass the result into.
The first argument, the callback function, is expected to be like the callback function for filter, in that it returns a true or false. Unlike filter, the final value out of every and some is also a true or false.
The difference between these two functions is this: With every, every single call to the callback function must return true
for it to return true. If any one of the calls to the callback function returns false
the whole set is false. Conversely, some returns true if any call to the callback function returns true, and returns false only if all of the calls return false.
Because both of these functions can return a value after a single one of them returns that particular value, they can “short-circuit” and stop evaluation the moment that occurs, so there is no guarantee that the callback will run on every item in your queue. Do not rely on every and some for any side-effects. Use each for that, since that’s what it was intended for.
This is also a specialization of reduce, and takes only one argument, a final callback function or the name of a queue. As its name implies, it takes the values passed into it as a queue and returns them as an array in the order they were received once it knows it is receiving no more values (parent queue has closed).
This is sort of the inverse of toArray. Flatten takes an array and enqueues them one at a time. Actually, any time it encounters an array it enters it and starts enqueuing the non-array values, so [1, [2, [3, [4, [5]]]]]
enqueues as you’d imagine. This behavior can be overridden by its single, optional argument, which is an integer number for the depth flatten should traverse into to enqueue. The exterior array is counted as depth 0, so setting depth to 3 would convert [1, [2, [3, [4, [5]]]]]
into the queue 1, 2, 3, 4, [5]
.
Since significant discussion has already occurred surrounding branch, this will be short. branch
can accept a string as input and it enqueues all incoming results into the named queue. If instead it is given an array of names, it will enqueue a copy into each listed queue. Once the queue branch
is pulling from closes, it will NOT close the queue(s) it has been pushing values into. Since they are named queues and not directly tied to each other anonymously, more than one queue could be feeding results into it. If you want to close that queue, do the following:
q('someQueue')
.branch('someOtherQueue')
.on('close', q.Q.prototype.close.bind(q('someOtherQueue')));
or
q('someQueue')
.branch('someOtherQueue')
.on('close', function() {
q('someOtherQueue').close();
});
(the second is safer if you don’t know which constructor function was used to create the queue).
This is the most complex method for queue-flow
. node
has traits of map
, filter
, and branch
grafted together with a special behavior for values that are arrays to work particularly well with Node.js-style APIs. Specifically APIs that follow this convention:
someApi.method(arg1, arg2, .. argN, callback);
// where callback has the following signature
function(error, result)
Most Node.js APIs follow this convention, and therefore most can be passed into node
directly. Note arg1, arg2, .. argN
. If the enqueued value is an array of values, these values are treated as the list of arguments (plus the callback) to provide to the API. A non-array value will be passed in as the only argument (or one of two if an async function).
node
itself takes two arguments. The first is a function that conforms to the above convention, and the second is a multi-type variable that determines what it will do if the API call returns an error.
undefined, null, 0, "", false
) the error is simply ignored and the next enqueued item is processed.true
, the error signals the immediate closing of the queue and all sub-queues.node
that time around.If there was no error, the result is placed into the next anonymous queue.
Similarly promise
works with CommonJS Promises (called Deferred Objects by jQuery). In this case, a function to create the promise is passed into the promise
method. The value or array of values passed into the promise
are used as the arguments for this promise constructor. When the promise is resolved, the value is passed on to the next stage in the queue.
If it was an error, it functions in a similar fashion to node
, the error can be ignored, sent into a callback, or another queue.
This has been an overview of the most important data processing methods of queue-flow
. For a complete listing of all methods, see the source documentation.
Branching is just branch
, right? Not exactly. branch
is analogous to an if
statement, but how would you accomplish the following?
if(foo) {
val = bar();
} else {
val = baz();
}
another(val);
See the statement below the if
statement that relies on the result from either of the two methods called above? We want to branch between two queues, but then use the result from either queue with the same method.
One way could be:
q('someQueue')
.branch(foo);
q('bar')
.map(bar)
.map(another);
q('baz')
.map(baz)
.map(another);
But that’s not really good code reuse (although in this simple example it’ll be fewer lines of code). A better way is to rejoin the branches into another queue:
q('someQueue')
.branch(foo);
q('bar')
.map('bar')
.branch('anotherQueue');
q('baz')
.map(baz)
.branch('anotherQueue');
q('anotherQueue')
.map(another);
Recursive loops are easy with named queues:
q('someQueue')
.map(foo)
.branch('someQueue');
But lookout! That’s an infinite loop. You also need a filter
.
q('someQueue')
.map(foo)
.filter(untilExitCondition)
.branch('someQueue');
If you need a for
loop style behavior, you can accomplish something like that with map
and exec
:
// Number of times to run
q(new Array(10))
// Turn the array into the function you want to run
.map(function() { return funcToRun; })
// Execute your code the desired number of times
// with the desired arguments
.exec(arg1, arg2);
With these building blocks you can implement any kind of algorithm with queue-flow
. For some more complex examples, checkout out the examples included with queue-flow
.