WebDirections Code 2014: Asynchronous JavaScript Patterns   (@rvagg)

Asynchronous JavaScript Patterns

This work is © 2014 Rod Vagg and is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Australia License

Embrace the Async

@rvagg
Rod Vagg


Erik

Milky Joe


http://r.va.gg
http://nodefirm.com

TOC

Defining "asynchronous programming"

Constructing basic async abstractions: each, map, series

Advanced abstractions

  • EventEmitter
  • Streams
  • Promises

Generators for async programming

Ranty stuff

In the begining


Programming was about computing

Computers were self-contained number-crunchers

I/O was late to the game

I/O is not computing


I/O is what computers wait for

I/O is the bottleneck in the majority of programs

I/O is usually hidden

System.out.println("Reading file...");
BufferedReader br = new BufferedReader(new FileReader("in.txt"));

try {
  StringBuilder sb = new StringBuilder();
  String line;

  while ((line = br.readLine()) != null)
    sb.append(line + "\n");
  System.out.print(sb.toString());
} finally {
  br.close();
}

System.out.println("Finished reading file!");

The programmer isn't prompted to consider the costs

I/O is expensive

ClassOperationTime cost
Memory L1 cache reference: 1 ns
L2 cache reference: 4 ns
Main memory reference: 100 ns
I/O SSD random-read: 16,000 ns
Round-trip in same datacenter: 500,000 ns
Physical disk seek: 4,000,000 ns
Round-trip from AU to US: 150,000,000 ns

The UI introduced new challenges


I/O in the form of a human

Unpredictable

Very high latency

Sequential programming has limits

Good UIs don't constrain the user

Networked UIs: a perfect storm


User variability delivered over unreliable, high-latency networks

Browsers...

On the web, nothing is synchronous


Always in reactive mode, responding to external events

  • React to user events
  • React to network events
  • React to browser events

JavaScript:
The King of event-driven programming

Born in the browser

Designed to respond to user-events

Timers!   BREAKING NEWS: Scrolling text with DHTML!

var p = document.getElementById('scroll');
function scroll () {
  var ptxt = p.innerHTML.replace(/ /g, ' ').split('');
  ptxt.push(ptxt.shift());
  p.innerHTML = ptxt.join('').replace(/ /g, ' ');
}
setInterval(scroll, 100)

Warning: content may trigger intense feelings of nostalgia

JavaScript:
The King of event-driven programming


  • First-class functions

  • Closures and scope capturing

  • Single-threaded

Asynchronicity at the extreme: Node.js


Async works well for performing many complex, parallel tasks in the browser, why not on the server too?

Synchronous I/O with Node.js

Synchronous file system I/O, on the JavaScript thread

console.log('Reading data...');
var data = fs.readFileSync('in.dat');
console.log('Finished reading data!');

Don't do this

Asynchronous I/O with Node.js

File system I/O is performed on a thread-pool when asynchronous

console.log('Reading data...');
fs.readFile('in.dat', function (err, data) {
  // asynchronous
  console.log('Finished reading data!');
})
console.log('Not finished reading data...');

Asynchronous I/O with Node.js


Network I/O performed with non-blocking system calls

epoll or select depending on platform

Asynchronous I/O with Node.js

Network I/O, always asynchronous, generally event-based

var server = http.createServer()

server.on('request', function (request, response) {
  // handle HTTP request
});
server.on('clientError', function () { /* ... */ }});
server.on('error', function () { /* ... */ }});
server.on('close', function () {
  console.log('Server shut down');
})
server.on('listening', function () {
  console.log('Listening on port 8080');
});

server.listen(8080);

The callback

JavaScript embraces the continuation passing style

console.log('Ping!');
function pong () { console.log('Pong!'); }
setTimeout(pong, 100);
console.log('Reading file...');
fs.readFile('in.dat', 'utf8', function (err, data) {
  console.log('in.dat contains %d lines', data.split('\n').length);
});
function clickHandler () { alert('Yo!'); }
el.addEventListener('click', clickHandler, false);

The callback

The callback function is the fundamental unit of asynchronous programming in JavaScript

  • DOM events
  • Browser-based networking
  • Basic Node.js I/O operations
  • Node.js EventEmitter
  • Streams

Even Promises and async utilities for generators are built on callbacks

Basic abstractions

Basic abstractions: callback counting

Example: a program to ls -ta

Tools:

// list files in a directory
fs.readdir(directory, function (err, fileList) { /* ... */ });

// perform a `stat` on a file
fs.stat(filename, function (err, stat) { /* ... */ });

Basic abstractions: callback counting

// v1: print the modification times of each file
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;

  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (err) return console.error('Error stating %s', file);

      console.log('%s: %s', file, stat.mtime);
    });
  });
});

Basic abstractions: callback counting

// v2: collect modification times
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;

  var times = {};
  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (err) return console.error('Error stating %s', file);

      times[file] = stat.mtime;
      console.log(times);
      // now what?
    });
  });
});

Basic abstractions: callback counting

// v3: collect modification times and count callbacks
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;

  var times = {}, count = 0;
  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (err) {
        console.error('Error stating %s', file);
      } else {
        times[file] = stat.mtime;
      }
      if (++count === fileList.length)
        console.log(times);
    });
  });
});

Basic abstractions: callback counting

// v4: collect modification times, count callbacks, sort and print
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;
  var times = {}, count = 0;
  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (!err) times[file] = stat.mtime;
      if (++count === fileList.length) {
        fileList.sort(function (a, b) {
          if (times[a] > times[b]) return -1;
          if (times[a] < times[b]) return 1;
          return 0;
        });
        console.log(fileList.join('\n'));
      }
    });
  });
});

Basic abstractions: callback counting

// v5: reorganise
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;
  var times = {}, count = 0;
  function sortFn (a, b) {
    return times[a] > times[b] ? -1 : times[a] < times[b] ? 1 : 0;
  }
  function sortAndPrint () {
    console.log(fileList.sort(sortFn).join('\n'));
  }
  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (!err) times[file] = stat.mtime;
      if (++count === fileList.length) sortAndPrint();
    })
  });
});

Basic abstractions: callback counting

// a handy utility!
function counter (limit, callback) {
  var count = 0;
  return function () {
    count++;
    if (count === limit)
      callback();
  }
}
var done = counter(4, function () {
  console.log('DONE!');
});
done();
done();
done();
done();

Basic abstractions: callback counting

// v6: introduce counting abstraction
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;
  var times = {};
  var done = counter(fileList.length, sortAndPrint);
  function sortFn (a, b) {
    return times[a] > times[b] ? -1 : times[a] < times[b] ? 1 : 0;
  }
  function sortAndPrint () {
    console.log(fileList.sort(sortFn).join('\n'));
  }
  fileList.forEach(function (file) {
    fs.stat(dir + '/' + file, function (err, stat) {
      if (!err) times[file] = stat.mtime;
      done();
    })
  });
});

Basic abstractions: callback counting

// with error handling
function counter (limit, callback) {
  var count = 0;
  var borked = false;
  return function (err) {
    if (borked) return;
    if (err) {
      borked = true;
      return callback(err);
    }
    count++;
    if (count === limit)
      callback();
  }
}

https://github.com/Raynos/after

Basic abstractions: asynchronous map

// synchronous ES5 Array#map()

[ 1, 2, 3 ].map(function (el) {
  return el * 10;
});

// -> [ 10, 20, 30 ]

Basic abstractions: asynchronous map

// Array#map() for async!
function asyncMap (arr, workCallback, callback) {
  var result = [];
  var done = counter(arr.length, function (err) {
    if (err) return callback(err);
    callback(null, result);
  });
  arr.forEach(function (el, i) {
    workCallback(el, function (err, elResult) {
      if (err) return done(err);
      result[i] = elResult;
      done();
    });
  });
}

https://github.com/Raynos/map-async

Basic abstractions: asynchronous map

Example usage:

/* Array#map version:
console.log([ 1, 2, 3 ].map(function (el) {
  return el * 10;
}));
*/

asyncMap(
  [ 1, 2, 3 ],
  function (el, callback) {
    callback(null, el * 10);
  },
  function (err, result) {
    console.log(result);
  }
);

Basic abstractions: asynchronous map

// v7: introduce async map abstraction, remove all explicit state
fs.readdir(dir, function (err, fileList) {
  if (err) throw err;
  function sortAndPrint (err, fileTimes) {
    fileTimes.sort(function (a, b) {
      return a.time > b.time ? -1 : a.time < b.time ? 1 : 0;
    });
    fileTimes.forEach(function (fileTime) {
      console.log(fileTime.file);
    });
  }
  function statFile (file, callback) {
    fs.stat(dir + '/' + file, function (err, stat) {
      callback(null, { file: file, time: stat ? stat.mtime : 0 });
    });
  }
  asyncMap(fileList, statFile, sortAndPrint);
});

Basic abstractions: serial execution

Example: concat multiple files into a single file

Tools:

// read the contents of a file into a String or Buffer
fs.readFile(filename, encoding, function (err, data) { /* ... */ });

// append a String or Buffer to a file
fs.appendFile(filename, buffer, function (err) { /* ... */ });

Basic abstractions: serial execution

// v1: naive version, read and write each file
files.forEach(function (file) {
  fs.readFile(file, 'utf8', function (err, contents) {
    if (err) throw err;

    fs.appendFile(output, contents, function (err) {
      if (err) throw err;
    })
  });
});

Basic abstractions: serial execution

// v2: process files one at a time
function catFile (file, callback) {
  fs.readFile(file, 'utf8', function (err, contents) {
    if (err) return callback(err);
    fs.appendFile(output, contents, callback);
  });
}
function next (index) {
  if (index >= files.length)
    return console.log('Done!');
  catFile(files[index], function (err) {
    if (err) throw err;
    next(index + 1);
  })
}
next(0);

https://github.com/hughsk/async-series

Advanced abstractions


  • EventEmitter

  • Streams

  • Promises

Advanced abstractions: EventEmitter


Sometimes a single callback isn't enough

  • Multiple calls
  • Multiple types of responses
  • Communicate complex state

Advanced abstractions: EventEmitter

// in Node.js
var EventEmitter = require('events').EventEmitter;

var ee = new EventEmitter();
ee.on('ping', function () {
  console.log('pong');
})
ee.emit('ping');

on(), once(), emit(), removeListener(), removeAllListeners()

One surprise: 'error' event

Advanced abstractions: EventEmitter

// sometimes an API should return an EventEmitter
var server = http.createServer()

server.on('request', function (request, response) {
  // handle HTTP request
});
server.on('clientError', function () { /* ... */ }});
server.on('error', function () { /* ... */ }});
server.on('close', function () {
  console.log('Server shut down');
});
server.on('listening', function () {
  console.log('Listening on port 8080');
});

server.listen(8080);

Advanced abstractions: Streams

For

  • Chunked I/O
  • Chunked data processing

Why?

  • Abstraction suited to flowing data: plumbing
  • Process very large amounts of data with minimal memory
  • Process many asynchronous streams of data simultaneously
  • Push output data before input data has completed
  • Backpressure

Advanced abstractions: Streams

var request = require('request');
var json = require('JSONStream');
var through2 = require('through2');
var csv = require('csv-write-stream');
var zlib = require('zlib');
var fs = require('fs');

request({ url: 'https://registry.npmjs.org/-/all' }) // ~35M raw
  .pipe(json.parse('*'))
  .pipe(through2.obj(function (data, enc, callback) {
    if (data.name && data.description)
      this.push({ name: data.name, description: data.description })
    callback()
  }))
  .pipe(csv({ headers: [ 'name', 'description' ] }))
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('npm.csv.gz'))

Advanced abstractions: Promises


An abstraction with a very well-defined API

Promises/A+ http://promisesaplus.com/

Advanced abstractions: Promises


An object with a then() method as a hook in to the state and data represented by the Promise

Three states: pending, fulfilled, rejected

promise.then(onFulfilled, onRejected)

Composable

Advanced abstractions: Promises

var q = require('q');
var preaddir = q.denodeify(fs.readdir);
var pstat = q.denodeify(fs.stat);

function sortAndPrint (fileTimes) {
  fileTimes.sort(function (a, b) {
    return a.time > b.time ? -1 : a.time < b.time ? 1 : 0;
  });
  fileTimes.forEach(function (fileTime) {
    console.log(fileTime.file);
  });
}

preaddir(dir).then(function (fileList) {
  function statFile (file) {
    return pstat(path.join(dir, file)).then(function (stat) {
      return { file: file, time: stat ? stat.mtime : 0 };
    });
  }
  q.all(fileList.map(statFile)).then(sortAndPrint);
});

Advanced abstractions: Promises


What's the catch?

Error-handling as a second-class citizen

Heavy-weight abstraction: can easily mask problems

Viral: infect everything they touch

ES6 Generators


Magic to solve async programming pain?

ES6 Generators


Not actually magic

Not actually an async programming construct

A protocol with some additional syntactic sugar:

  • function* myGenerator () { /* ... */ }
  • yield someValue;

ES6 Generators

A pseudo random number generator (PRNG)

// Linear congruential pseudo random number generator
function random (seed) {
  var m = 25, a = 11, c = 17, z = seed;
  for (var i = 0; i < 10; i++) {
    z = (a * z + c) % m;
    console.log(z);
  }
}

ES6 Generators

A pseudo random number generator generator (PRNGG?)

function* random (seed) {
  var m = 25, a = 11, c = 17, z = seed;
  while (true) {
    z = (a * z + c) % m;
    yield z;
  }
}

ES6 Generators

function* random (seed) {
  var m = 25, a = 11, c = 17, z = seed;
  while (true) {
    z = (a * z + c) % m;
    yield z;
  }
}
var gen = random(100);
console.log(gen.next().value);
console.log(gen.next().value);
console.log(gen.next().value);
console.log(gen.next().value);
console.log(gen.next().value);

ES6 Generators

// PRNG in ES3 using the generator protocol
function random (seed) {
  var m = 25, a = 11, c = 17, z = seed;
  return {
    next: function () {
      z = (a * z + c) % m;
      return { value: z, done: false };
    }
  }
}

ES6 Generators

// PRNG generator with re-seed capabilities
function* random (seed) {
  var m = 25, a = 11, c = 17, z = seed;
  while (true) {
    z = (a * z + c) % m;
    seed = yield z; // if next() is called with a value, it returns here
    if (typeof seed == 'number')
      z = seed;
  }
}

var gen = random(10);
console.log(gen.next().value);
console.log(gen.next().value);
console.log(gen.next().value);
console.log(gen.next(10).value); // re-seed
console.log(gen.next().value);
console.log(gen.next().value);

ES6 Generators

For asynchronous programming?

function timer (callback) {
  setTimeout(function () {
    callback(null, Date.now());
  }, Math.random() * 1000);
}

// take advantage of the suspendable nature of generators
function* timerGen () {
  var v1 = yield timer;
  console.log('v1', v1);
  var v2 = yield timer;
  console.log('v2', v2);
  var v3 = yield timer;
  console.log('v3', v3);
}

ES6 Generators

Requires a helper

function runGenerator (fn) {
  var gen = fn();

  (function next (arg) {
    var it = gen.next(arg);
    if (it.done) return;

    it.value(function (err, data) {
      if (err) return gen.throw(err); // raise exception at `yield`
      next(data);
    });
  }());
}

https://github.com/visionmedia/co

ES6 Generators

Callback-only functions make this practical

function timer (timeout) {
  return function (callback) {
    setTimeout(function () { callback(null, Date.now()); }, timeout);
  }
}

runGenerator(function* timerGen () {
  var v1 = yield timer(1000);
  console.log('v1', v1);
  var v2 = yield timer(2000);
  console.log('v2', v2);
  var v3 = yield timer(3000);
  console.log('v3', v3);
});

https://github.com/visionmedia/node-thunkify

ES6 Generators


What's the catch?

Not widely available

Not well optimised yet

Serial execution by default, parallel requires extra steps

Is it intuitive?

Asynchronous programming



Accept it, it's the new norm