十五、Async.js

总是有新的东西,总是有我没想到的东西,有时并不可怕。—罗伯特·乔丹

协调软件流程可能很麻烦,尤其是当异步操作在不同时间完成时。第 16 章展示了如何用承诺来解决这个问题。本章讨论 Async.js,这是一个回调驱动的 JavaScript 库,它提供了一套强大的函数来管理异步集合操作和控制流。

第 16 章讲述了异步代码可能会出现问题的三种常见流程:顺序流程、并行流程和管道流程。为了用承诺来处理这些流,第 16 章展示了如何用 Q 的助手方法来适应每个面向回调的任务,以便每个任务都可以方便地包装在承诺中。然而,Async.js 库包含了回调驱动的异步编程方法,但是这种方法避免了回调驱动的代码(如嵌套回调)带来的许多缺点。

许多 Async.js 控制流函数遵循类似的模式:

The first argument to each control flow function is typically an array of functions to be executed as tasks. Task function signatures will vary a bit based on the exact Async.js control flow function used, but they will always receive a Node.js-style callback as a last argument.   The last argument to each control flow function is a final callback function to be executed when all tasks are complete. The final control flow function also receives a Node.js-style callback and may or may not receive additional arguments as well.   Note

Node.js 样式的回调只是一个回调函数,它总是将错误作为它的第一个参数。当回调被调用时,要么将一个错误对象作为其唯一的参数传递,要么将null作为错误值传递,并将任何其他值作为附加参数传递。

清单 15-1 展示了这种模式通常是如何应用的。

Listing 15-1. Flow Control Function Pattern

var tasks = [

function (/*0..n args, */ cb) {/*...*/},

function (/*0..n args, */ cb) {/*...*/},

function (/*0..n args, */ cb) {/*...*/}

];

function finalCallback (err, result) {/*...*/};

async.someFlowControlFunction(tasks, finalCallback);

本章的其余部分将研究大量的控制流函数,以及它们与这个通用模式的不同之处。因为所有的流都以相似的方式组织任务和处理错误和值,所以通过对比可以更容易地理解每一个。

Note

Async.js 中 async 的含义与组织异步操作有关。库本身不保证任务函数异步执行。如果开发人员将 Async.js 与同步函数一起使用,每个函数都将同步执行。这条规则有一个半例外。async.memoize()函数(与控制流无关)使函数可缓存,因此后续调用不会实际运行该函数,而是返回缓存的结果。Async.js 强制每个后续调用都是异步的,因为它假设原始函数本身是异步的。

顺序流程

顺序流程是指一系列步骤必须按顺序执行的流程。一个步骤可能直到前一个步骤完成后才开始(除了第一个步骤),如果任何一个步骤失败,整个流程都会失败。清单 15-2 中的函数是更改一个虚构用户密码的步骤,与在第 16 章中引入顺序流程的场景相同。然而,这些步骤略有不同。

首先,每一个都被包装在一个工厂函数中,该函数接受一些初始数据并返回一个基于回调的函数,以用作顺序流中的一个步骤。

第二,第一步(包装在changePassword()函数中的任务)实际上将新凭证作为操作结果传递给它的回调函数。顺序流中的步骤不需要生成结果,但是如果一个步骤确实将结果传递给了它的回调函数,那么它对序列中的其他步骤没有影响。如果某些(或所有)步骤依赖于前面步骤的结果,则需要管道流。(管道将在本章后面讨论。)

Listing 15-2. Sequential Steps

// example-001/async-series.js

'use strict';

var async = require('async');

var userService = require('./user-service');

var emailService = require('./email-service');

var nothingToSeeHere = require('./nothing-to-see-here');

function changePassword(email, password) {

return function (cb) {

process.nextTick(function () {

userService.changePassword(email, password, function (err, hash) {

// new credentials returned as results

cb(null,``{email: email, passwordHash: hash}T2】

});

});

};

}

function notifyUser(email) {

return function (cb) {

process.nextTick(function () {

// the email service invokes the callback with

// no result

emailService.notifyPasswordChanged(email, cb);

});

};

}

function sendToNSA(email, password) {

return function (cb) {

process.nextTick(function () {

// the nothingToSeeHere service invokes the

// callback with no result

nothingToSeeHere.snoop(email, password, cb);

});

}

}

在清单 15-3 中,每个工厂函数都用它的初始数据执行,返回添加到一个steps数组中的任务函数。这个数组成为async.series()的第一个参数,后面是一个最终的回调函数,它接收序列执行过程中产生的任何错误,或者是序列中每一步产生的结果数组。如果生成了任何结果,它们将按照它们在steps数组中对应步骤的顺序存储。例如,changePassword()的结果将是results数组中的第一个元素,因为changePassword()作为第一个任务被调用。

Listing 15-3. Sequential Flow

// example-001/async-series.js

var email = 'user@domain.com';

var password = 'foo!1';

var steps = [

//returns function(cb)

changePassword(email, password),

//returns function(cb)

notifyUser(email),

//returns function(cb)

sendToNSA(email, password)

];

async.series(steps, function (err, results) {

if (err) {

return console.error(err);

}

console.log('new credentials:', results[0]);

});

因为这些步骤是异步的,所以不能像调用同步函数那样一次调用一个。但是 Async.js 在内部跟踪每个步骤的执行,只有在调用了前一个步骤的回调时才调用下一个步骤,因此创建了一个顺序流。如果顺序流中的任何步骤向其回调传递了一个错误,该系列将被中止,并且最后一个系列回调将因该错误而被调用。当出现错误时,results值将未定义。

本章中使用的工厂函数是向每个步骤传递初始数据的便捷方式,但它们不是必需的。为了支持 JavaScript 的本地函数绑定工具,工厂可以被删除,如清单 15-4 所示,但是当步骤被实际添加到数组中时,代码变得更加难以阅读。对于不需要初始数据或绑定的简单场景,匿名任务函数可以直接在steps数组中声明。(但是,以一种促进可读性和可维护性的方式命名和声明函数总是一个好主意。)

Listing 15-4. Series Steps with Argument Binding

function changePassword(email, password, cb) {/*...*/}

function notifyUser(email, cb) {/*...*/}

function sendToNSA(email, password, cb) {/*...*/}

var steps = [

changePassword.bind(null, email, password),

notifyUser.bind(null, email),

sendToNSA.bind(null, email, password)

];

在本章的剩余部分,我们将使用工厂函数而不是bind(),但是开发者可以自由选择他们觉得最自然的方法。

平行流

有时,并行运行独立的任务,然后在所有任务完成后汇总结果会很有帮助。JavaScript 是一种异步语言,因此它没有真正的并行性,但连续调度长时间的非阻塞操作将释放事件循环来处理其他操作(如浏览器环境中的 UI 更新,或服务器环境中的额外请求)。可以在事件循环的一个回合中调度多个异步任务,但是无法预测每个任务将在未来的哪个回合完成。这使得从每个任务中收集结果并将它们返回给调用代码变得困难。幸运的是,async.parallel()函数为开发人员提供了这样做的方法。

清单 15-5 展示了两个包装 jQuery GET 请求的函数。第一个获取给定userID的用户数据,第二个获取美国各州的列表。很容易想象,这些功能可能是用户个人资料页面的一部分,用户可以在该页面上更新电话号码、邮政地址等个人信息。当页面加载时,一次性获取这些信息是有意义的。不过,这是两个不同的 API 调用,所以即使它们被同时调度,结果也需要在将来的某个时间点进行处理。

Listing 15-5. Parallel Steps

// example-002/views/async-parallel.html

function getUser(userID) {

return function (cb) {

$.get('/user/' + userID).then(function (user) {

cb(null, user);

}).fail(cb);

};

}

function getUSStates(cb) {

$.get('/us-states').then(function (states) {

cb(null, states);

}).fail(cb);

}

在清单 15-6 中,Async.js 被导入到一个带有标准<script>标签的虚构网页中。使用async.parallel()函数调度任务,像async.series()一样,它接受一组要执行的任务函数和一个接收错误或聚合结果的最终回调函数。并行任务只是接受单个回调参数的函数,一旦任务函数中的异步操作完成,就应该调用这个回调参数。所有回调都符合 Node.js 回调约定。

清单 15-6 中的getUser()函数是一个工厂,它接受一个userID参数并返回一个接受常规 Node.js 风格回调的函数。因为getUSStates()没有实际参数,所以不需要包装在工厂函数中,而是直接使用。

这两个函数都使用 jQuery 的 AJAX API 获取数据。AJAX promises 将数据从成功的 AJAX 调用传递给传递给 promise 的then()方法的任何回调,而将错误传递给传递给 promise 的fail() method方法的任何回调。因为fail()回调的签名接受单个错误参数,所以从 Async.js 传递给每个任务的回调也可以用作对fail()的回调。

Listing 15-6. Parallel Flow

<!-- example-002/views/async-parallel.html -->

<h1>User Profile</h1>

<form>

<fieldset>

<div>

<label>First Name</label>

<input type="text" id="first-name" />

</div>

<div>

<label>US States</label>

<select id="us-states"></select>

</div>

</fieldset>

</form>

<script>

(function (async, $) {

function getUser(userID) {

return function (cb) {

$.get('/user/' + userID).then(function (user) {

cb(null, user);

}).fail(cb);

};

}

function getUSStates(cb) {

$.get('/us-states').then(function (states) {

cb(null, states);

}).fail(cb);

}

var userID = 1001;

async.parallel([

getUser(userID),

getUSStates

], function (err, results) {

if (err) {

return alert(err.message);

}

var user = results[0],

states = results[1];

$('#first-name').val(user.firstName);

// ...

$('#us-states').append(states.map(function (state) {

return $('<option></option>')

.html(state)

.attr('value', state);

}));

});

}(window.async, window.jQuery));

</script>

Async.js 库将遍历tasks数组中的每个任务,一个接一个地调度它们。当每个任务完成时,它的数据被存储,一旦所有任务完成,传递给async.parallel()的最后一个回调被调用。

结果按照传递给async.parallel()的任务的顺序排序,而不是按照任务实际解决的顺序。如果任何并行任务中出现错误,该错误将被传递给最终回调,所有未完成的并行任务一旦完成将被忽略,最终回调中的results参数将为undefined

管道流动

当一系列任务中的每个任务都依赖于前一个任务的值时,就需要一个管道流(或瀑布)。清单 15-7 表示一个虚构的公司奖励计划的任务,其中计算用户的年龄(基于出生日期),如果用户的年龄达到一定的阈值,用户将获得现金奖励。

每个函数接收一些输入,然后将一些输出传递给它的回调函数。每个函数的输出成为系列中下一个函数的输入。

The getUser() factory function accepts a userID and returns another function that, when invoked, looks up a user record. It passes the user record to its callback.   The calcAge() function accepts a user argument and invokes its callback with the calculated age of the user.   The reward() function accepts a numeric age argument and invokes its callback with the selected reward if the age meets certain thresholds.   Listing 15-7. Waterfall (Pipeline) Steps

// example-003/callback-waterfall

'use strict';

var db = require('./database');

function getUser(userID, cb) {

process.nextTick(function () {

// pass cb directly to find because

// it has the same signature:

// (err, user)

db.users.find({id: userID}, cb);

});

}

function calcAge(user, cb) {

process.nextTick(function () {

var now = Date.now(),

then = user.birthDate.getTime();

var age = (now - then) / (1000 * 60 * 60 * 24 * 365);

cb(null, Math.round(age));

});

}

function reward(age, cb) {

process.nextTick(function () {

switch (age) {

case 25: return cb(null, '$100');

case 35: return cb(null, '$150');

case 45: return cb(null, '$200');

default: return cb(null, '$0');

}

});

}

如果用嵌套的回调来组织,这个管道将会相当难看并且难以维护。如果奖励计划中增加了额外的步骤,就需要对代码进行梳理和重组,以适应流水线流程中的新步骤。捕获错误并通过回调传播它们也是手动进行的。清单 15-8 中的示例代码展示了在没有 Async.js 的情况下如何运行这些任务。

Listing 15-8. A Waterfall of Nested Callbacks

// example-003/callback-waterfall

function showReward(userID, cb) {

getUser(userID, function (err, user) {

if (err) {

return cb(err);

}

calcAge(user, function (err, age) {

if (err) {

return cb(err);

}

reward(age, cb);

});

})

}

showReward(123, function (err, reward) {

if (err) {

return console.error(err);

}

console.log(reward);

});

幸运的是,Async.js 使得组织一个既可维护又能优雅地处理错误的管道流变得相对容易。清单 15-9 中的代码使用async.waterfall()来组织要执行的一系列任务,然后提供一个最终回调来捕获管道任务引发的任何错误,或者在没有错误发生的情况下接收最终的reward值。

Listing 15-9. Waterfall (Pipeline) Flow

// example-003/async-waterfall.js

'use strict';

var async = require('async');

var db = require('./database');

function getUser(userID) {

// using a factory function to pass in

// the userID argument and return another

// function that will match the callback

// signature that async.waterfall expects

return function (cb) {

process.nextTick(function () {

// pass cb directly to find because

// it has the same signature:

// (err, user)

db.users.find({id: userID}, cb);

});

};

}

// the calcAge and reward functions

// do not change

async.waterfall([

getUser(1000),

calcAge,

reward

], function (err, reward) {

if (err) {

return console.error(err);

}

console.log('reward:', reward);

});

async.series()async.parallel()一样,在任何瀑布任务中传递给回调的错误将立即暂停管道并调用带有错误的最终回调。

重复使用管道

管道对于处理数据非常有帮助,以至于async.seq()会采用一系列函数,就像async.waterfall()一样,并将它们组合成一个单一的、可重用的管道函数,可以多次调用。当然,这可以通过使用闭包来包装async.waterfall()来手动完成,但是async.seq()是一个方便的函数,可以省去开发人员的麻烦。

清单 15-10 显示了一系列用于处理虚拟手机账单的函数。createBill()函数接受一个调用计划,并用该计划和正常月费率创建一个bill对象。carrierFee()在这个数额上追加一大块零钱,只是因为电话公司可以这么做。prorate()功能随后确定是否将一些金额记入用户的贷方(例如,如果用户在计费周期的中间开始了新的计划)。最后,govtExtortion()会在交付账单之前将计算好的税款附加到账单上。

Listing 15-10. Sequence (Pipeline) Steps

// example-004/async-seq.js

'use strict';

var async = require('async');

var dateUtil = require('./date-util');

function createBill(plan, cb) {

process.nextTick(function () {

var bill = {

plan: plan,

total: plan.billAmt

};

cb(null, bill);

});

}

function carrierFee(bill, cb) {

process.nextTick(function () {

bill.total += 10;

cb(null, bill);

});

}

function prorate(bill, cb) {

if (!bill.plan.isNew) {

return cb(null, bill);

}

process.nextTick(function () {

bill.plan.isNew = false;

var days = dateUtil().daysInMonth();

var amtPerDay = bill.plan.billAmt / days;

var prorateAmt = ((bill.plan.billDay - 1) * amtPerDay);

bill.total -= prorateAmt;

cb(null, bill);

});

}

function govtExtortion(bill, cb) {

process.nextTick(function () {

bill.total = bill.total * 1.08;

cb(null, bill);

});

}

async.seq()创建管道与使用async.waterfall()非常相似,如清单 15-11 所示。主要区别在于,async.seq()并不立即调用这些步骤,而是返回一个pipeline()函数,该函数将用于稍后运行任务。pipeline()函数接受将被传递到第一步的初始参数,消除了在定义管道时工厂函数或绑定值到第一步的需要。此外,与大多数其他async函数不同,async.seq()是可变的(接受可变数量的参数)。它不接受像async.waterfall()这样的任务数组,而是接受每个任务函数作为参数。

在清单 15-11 中,pipeline()函数被创建,然后用两个参数调用:一个plan对象,它将被传递给createBill(),以及一个最终回调,为用户接收一个错误或最终bill对象。

Listing 15-11. Sequence (Pipeline) Flow

// example-004/async-seq.js

var pipeline = async.seq(

createBill,

carrierFee,

prorate,

govtExtortion

);

var plan = {

type: 'Lots of Cell Minutes Plan!+',

isNew: true,

billDay: 15,

billAmt: 100

};

pipeline(plan, function (err, bill) {

if (err) {

return console.error(err);

}

//bill = govtExtortion(prorate(carrierFee(createBill(plan))))

console.log('$', bill.total.toFixed(2));

});

环路流量

重复直到满足某种条件的流称为循环。Async.js 有几个循环函数,帮助协调要执行的异步代码和要在其中测试的条件。

当某些条件保持为真时循环

前两个函数async.whilst()async.doWhilst(),类似于许多编程语言中众所周知的whiledo/while循环结构。当某个条件评估为真时,每个循环运行。一旦条件评估为假,循环停止。

async.whilst()async.doWhilst()功能几乎相同,除了async.whilst()在循环中的任何代码运行之前执行条件评估,而async.doWhilst()在执行条件评估之前执行循环的一次迭代。async.doWhilst()中的循环代码保证至少运行一次,而如果初始条件为假,则async.whilst()中的循环代码可能根本不会运行。

清单 15-12 显示了async.whilst()被用来调用一个 API 十次,以获得某个竞赛的随机“获胜者”。在循环运行之前,会检查一个姓名数组,以确定是否已经选出了 10 名获胜者。重复这个过程,直到数组的长度为 10。如果在循环中的一个 API 调用过程中出现错误,那么async.whilst()流将被终止,最后一个回调将被调用,并显示错误;否则,一旦循环条件评估为 false,将调用最终回调。

Listing 15-12. Looping While Some Condition Remains True

<!-- example-005/views/async-whilst.html -->

<h1>Winners!</h1>

<ul id="winners"></ul>

<script>

(function (async, $) {

function pickWinners(howMany, cb) {

var winners = [];

async.whilst(

// condition test:

// continue looping until we have enough winners

function () { return winners.length < howMany; },

// looping code

function (cb) {

$.get('/employee/random').done(function (employee) {

var winner = employee.firstName + ' ' + employee.lastName;

// avoid potential duplicates

if (winners.indexOf(winner) < 0) {

winners.push(winner);

}

cb(null);

}).fail(function (err) {

cb(err);

});

},

// final callback

function (err) {

// if there is an error just ignore it

// and pass back an empty array, otherwise

// pass the winners

cb(null, err ? [] : winners);

}

);

}

pickWinners(3, function (err, winners) {

$('ul#winners').append(winners.map(function (winner) {

return $('<li></li>').html(winner);

}));

});

}(window.async, window.jQuery));

</script>

清单 15-13 中的代码显示了使用async.doWhilst()代替async.whilst()循环的简短修改。请注意,参数的顺序已经改变。循环函数现在是async.doWhilst()的第一个参数,条件测试是第二个。这在结构上反映了do/while循环语法。

Listing 15-13. Looping Once and Then Continuing While Some Condition Remains True

<!-- example-005/views/async-dowhilst.html -->

<h1>Winners!</h1>

<ul id="winners"></ul>

<script>

(function (async, $) {

function pickWinners(howMany, cb) {

var winners = [];

async.doWhilst(

// looping code

function (cb) {

$.get('/employee/random').done(function (employee) {

var winner = employee.firstName + ' ' + employee.lastName;

// avoid potential duplicates

if (winners.indexOf(winner) < 0) {

winners.push(winner);

}

cb(null);

}).fail(function (err) {

cb(err);

});

},

// condition test is now the second function

// argument

function () { return winners.length < howMany; },

// final callback

function (err) {

// if there is an error just ignore it

// and pass back an empty array, otherwise

// pass the winners

cb(null, err ? [] : winners);

}

);

}

pickWinners(3, function (err, winners) {

$('ul#winners').append(winners.map(function (winner) {

return $('<li></li>').html(winner);

}));

});

}(window.async, window.jQuery));

</script>

循环,直到某个条件变为假

async.whilst()async.doWhilst()函数密切相关的是async.until()async.doUntil()函数,它们遵循相似的执行模式,但不是在某些条件为真时执行循环,而是执行循环直到某些条件测试为假。

清单 15-14 中的代码展示了如何在浏览器中创建一个简单的 HTTP 心跳来测试 API 端点的可用性。Heartbeat()构造函数用async.until()创建一个循环,该循环将重复执行,直到_isStopped属性的值被设置为true. Heartbeat()为止,该函数公开了一个stop()方法,当该方法在对象创建后被调用时,将阻止循环继续。循环的每一轮都向服务器发出 HTTP 请求,如果请求成功,循环将isAvailable属性设置为true;如果失败,则将isAvailable设置为false。为了创建循环迭代之间的延迟,一个setTimeout()函数将回调调用包装在循环中,安排循环的未来迭代在稍后运行(在本例中是每三秒钟一次)。

Listing 15-14. Looping Until Some Condition Becomes False

<!-- example-006/views/async-until.html -->

<section id="output"></section>

<script>

(function (async, $) {

var output = document.querySelector('#output');

function write() {

var pre = document.createElement('pre');

pre.innerHTML = Array.prototype.join.call(arguments, ' ');

output.appendChild(pre);

}

function Heartbeat(url, interval) {

var self = this;

this.isAvailable = false;

this.isStopped = false;

this.writeStatus = function () {

write(

'> heartbeat [isAvailable: %s, isStopped: %s]'

.replace('%s', self.isAvailable)

.replace('%s', self.isStopped)

);

};

async.until(

// test condition

function () { return self.isStopped; },

// loop

function (cb) {

$.get(url).then(function () {

self.isAvailable = true;

}).fail(function () {

self.isAvailable = false;

}).always(function () {

self.writeStatus();

// delay the next loop by scheduling

// the callback invocation in the

// future

setTimeout(function () {

cb(null);

}, interval);

});

},

// final callback

function (/*err*/) {

self.isAvailable = false;

self.writeStatus();

}

);

}

Heartbeat.prototype.stop = function () {

this.isStopped = true;

};

var heartbeat = new Heartbeat('/heartbeat', 3000);

setTimeout(function () {

// 10 seconds later

heartbeat.stop();

}, 10000);

}(window.async, window.jQuery));

</script>

async.doUntil()函数的行为类似于async.doWhilst():它在评估测试条件之前首先运行循环。它的签名也交换了测试条件函数和循环函数的顺序。

重试循环

循环的一个常见用例是重试循环,在这种情况下,任务会被尝试给定的次数。如果任务失败,但没有达到重试限制,它会再次执行。如果达到重试限制,任务将中止。async.retry()函数通过为开发人员处理重试逻辑来简化这个过程。建立循环就像指定重试限制、要执行的任务以及处理错误或接收结果的最终回调一样简单。

清单 15-15 演示了一个简单的 API 调用,用于在某场音乐会或电影中预订座位。可用座位按从最优先到最不优先的顺序排列。执行限制是数组的长度。每次任务运行时,它都会移动数组,从集合中删除第一个(最可取的)座位。如果预订失败,它将继续这个过程,直到没有剩余的座位。

Listing 15-15. Retry Loop

<!-- example-007/views/async-retry -->

<section id="output"></section>

<script>

(function (async, $) {

var output = document.querySelector('#output');

function write() {

var pre = document.createElement('pre');

pre.innerHTML = Array.prototype.join.call(arguments, ' ');

output.appendChild(pre);

}

function reserve(name, availableSeats) {

console.log(availableSeats);

return function (cb) {

var request = {

name: name,

seat: availableSeats.shift()

};

write('posting reservation', JSON.stringify(request));

$.post('/reservation', request)

.done(function (confirmation) {

write('confirmation', JSON.stringify(confirmation));

cb(null, confirmation);

}).fail(function (err) {

cb(err);

});

};

}

var name = 'Nicholas';

var availableSeats = ['15A', '22B', '13J', '32K'];

async.retry(

availableSeats.length,

reserve(name, availableSeats),

function (err, confirmation) {

if (err) {

return console.error(err);

}

console.log('seat reserved:', confirmation);

}

);

}(window.async, window.jQuery));

</script>

每次任务运行时,它都会调用回调函数。如果任务成功并向回调传递了一个值,那么最后的async.retry()回调将使用该值被调用(在本例中为confirmation)。如果出现错误,将重复循环,直到达到循环限制。最后一个错误被传递给最终回调;除非手动累积,否则之前的误差会丢失。清单 15-16 展示了一种潜在的方法,通过收集数组中的错误,然后将数组本身作为err参数传递给回调函数。如果重试循环失败,最终回调的错误将是在循环的每一轮中生成的每个错误的数组。

Listing 15-16. Accumulating Errors in a Retry Loop

function reserve(name, availableSeats) {

var errors = [];

return function (cb) {

// ...

$.post('/reservation', body)

.done(function (confirmation) {

cb(null, confirmation);

}).fail(function (err) {

errors.push(err);

cb(errors);

});

};

}

无限循环

无限循环在同步编程中是个坏消息,因为它们会阻止 CPU 和任何其他代码的执行。但是异步无限循环没有这个缺点,因为像所有其他代码一样,它们被 JavaScript 调度器安排在事件循环的未来循环中。其他需要运行的代码可以“插嘴”并请求调度。

可以用async.forever()调度无限循环。该函数将任务函数作为第一个参数,将最终回调函数作为第二个参数。该任务将继续无限期运行,除非它向其回调传递一个错误。使用等待时间为 0 的setTimeout()setImmediate()连续调度异步操作会在一个循环中产生几乎没有响应的代码,所以最好用更长的等待时间填充每个异步任务,至少几百毫秒。

清单 15-17 中的循环在无限循环的每一次循环中都发出一个 HTTP GET 请求,为用户的仪表板加载股票信息。每次 GET 请求成功时,股票信息被更新,循环在再次执行前等待三秒钟。如果在循环过程中出现错误,则使用错误调用最后一个回调,并终止循环。

Listing 15-17. Infinite Loop

<!-- example-008/views/async-forever.html -->

<ul id="stocks"></ul>

<script>

(function (async, $) {

$stockList = $('ul#stocks');

async.forever(function (cb) {

$.get('/dashboard/stocks')

.done(function (stocks) {

// refresh the stock list with new stock

// information

$stockList.empty();

$stockList.append(stocks.map(function (stock) {

return $('<li></li>').html(stock.symbol + ' $' + stock.price);

}));

// wait three seconds before continuing

setTimeout(function () {

cb(null);

}, 3000);

}).fail(cb);

}, function (err) {

console.error(err.responseText);

})

}(window.async, window.jQuery));

</script>

批量流动

本章介绍的最后一种控制流是批处理。批处理是通过将一些数据划分成块,然后一次对每个块进行操作来创建的。批处理有一些阈值,用于定义可以放入块中的数据量。在块上的工作开始后添加到批处理流中的数据被排队,直到工作完成,然后在新的块中被处理。

异步队列

异步队列是在批处理流中处理项目的一种方式。可以通过使用两个参数调用async.queue()来创建队列。第一个是任务函数,将为每个将被添加到队列中的数据项执行该函数。第二个是一个数字,表示队列将并发调度以处理数据的任务工作线程的最大数量。在清单 15-18 中,创建了一个队列来为添加到队列中的任何 URL 发出 HTTP 请求。当每个请求完成时,每个 HTTP 请求的结果将被添加到results散列中。任何时候可以运行的 HTTP 请求的最大数量是三。如果在三个请求正在进行的时候有额外的 URL 被添加到队列中,它们将被保留以供将来处理。当工人被释放时(当请求完成时),他们将根据需要被分配到排队的 URL。在给定的时间内,不会有超过三个 HTTP 请求正在进行。

Listing 15-18. Using Queue for Sequential Batches

// example-009/index.js

'use strict';

var async = require('async');

var http = require('http');

var MAX_WORKERS = 3;

var results = {};

var queue = async.queue(function (url, cb) {

results[url] = '';

http.get(url, function (res) {

results[url] = res.statusCode + ' Content-Type: ' + res.headers['content-type'];

cb(null);

}).on('error', function (err) {

cb(err);

});

}, MAX_WORKERS);

var urls = [ // 9 urls

'http://www.appendto.comT2】

'http://www.nodejs.orgT2】

'http://www.npmjs.orgT2】

'http://www.nicholascloud.comT2】

'http://www.devlink.netT2】

'http://javascriptweekly.comT2】

'http://nodeweekly.comT2】

'http://www.reddit.com/r/javascriptT2】

'http://www.reddit.com/r/nodeT2】

];

urls.forEach(function (url) {

queue.push(url, function (err) {

if (err) {

return console.error(err);

}

console.log('done processing', url);

});

});

队列将在其生命周期的某些点发出许多事件。可以将函数分配给队列对象上相应的事件属性,以处理这些事件。这些事件处理程序是可选的;无论有没有它们,队列都将正常运行。

当队列第一次达到活动工作者的最大数量时,它将调用分配给queue.saturated的任何函数。当队列正在处理所有项目并且没有其他项目排队时,它将调用分配给queue.empty的任何函数。最后,当所有的工人都完成并且队列为空时,任何分配给queue.drain的函数都会被调用。清单 15-19 中的函数处理每一个引发的事件。

Listing 15-19. Queue Events

// example-009/index.js

queue.saturated = function () {

console.log('queue is saturated at ' + queue.length());

};

queue.empty = function () {

console.log('queue is empty; last task being handled');

};

queue.drain = function () {

console.log('queue is drained; no more tasks to handle');

Object.keys(results).forEach(function (url) {

console.log(url, results[url]);

});

process.exit(0);

};

Note

emptydrained事件略有不同。当empty被触发时,尽管队列中没有剩余的项目,工人可能仍然是活动的。当drained被触发时,所有工人都停止工作,队列完全为空。

异步货物

async.cargo()函数类似于async.queue(),它将一些任务函数要处理的项目排队。然而,它们的不同之处在于工作负荷是如何划分的。async.queue()运行多个工作线程,直到达到最大并发限制——它的饱和点。async.cargo()一次运行一个工作线程,但是将队列中要处理的项目分成预定大小的有效负载。当 worker 被执行时,它将被赋予一个有效载荷。当它完成时,它将被给予另一个,直到所有的有效载荷被处理。那么,货物的饱和点是当满载的有效载荷准备好被处理时。工人启动后添加到货物中的任何物品都将被归入下一个待处理的有效负载中。

通过将任务函数作为第一个参数提供给async.cargo(),并将最大有效载荷大小作为第二个参数来创建货物。task 函数将接收一个要处理的数据数组(长度达到最大有效负载大小),并在操作完成后调用一个回调函数。

清单 15-20 中的代码展示了如何使用async.cargo()将一系列数据库更新打包到一个虚构的事务中,一次一个有效负载。task 函数遍历提供给它的“更新”对象,将每个对象转换成某个虚拟关系数据存储中的一个UPDATE查询。一旦所有的查询都被添加到事务中,事务就被提交,回调就被调用。

Listing 15-20. Using Cargo for Parallel Batches

// example-010/index-01.js

'use strict';

var async = require('async');

var db = require('db');

var MAX_PAYLOAD_SIZE = 4;

var UPDATE_QUERY = "UPDATE CUSTOMER SET ? = '?' WHERE id = ?;";

var cargo = async.cargo(function (updates, cb) {

db.begin(function (trx) {

updates.forEach(function (update) {

var query = UPDATE_QUERY.replace('?', update.field)

.replace('?', update.value)

.replace('?', update.id);

trx.add(query);

});

trx.commit(cb);

});

}, MAX_PAYLOAD_SIZE);

var customerUpdates = [ // 9 updates to be processed in payloads of 4

{id: 1000, field: 'firstName', value: 'Sterling'},

{id: 1001, field: 'phoneNumber', value: '222-333-4444'},

{id: 1002, field: 'email', value: 'archer@goodisis.com'},

{id: 1003, field: 'dob', value: '01/22/1973'},

{id: 1004, field: 'city', value: 'New York'},

{id: 1005, field: 'occupation', value: 'Professional Troll'},

{id: 1006, field: 'twitter', value: '@2cool4school'},

{id: 1007, field: 'ssn', value: '111-22-3333'},

{id: 1008, field: 'email', value: 'urmom@internet.com'},

{id: 1009, field: 'pref', value: 'rememberme=false&colorscheme=dark'}

];

customerUpdates.forEach(function (update) {

cargo.push(update, function () {

console.log('done processing', update.id);

});

});

货物对象与队列对象具有相同的事件属性,如清单 15-21 所示。主要区别在于,一旦添加了最大数量的有效载荷项目,就达到了货物的饱和极限,此时工人将开始工作。

可以根据需要将可选的函数处理程序分配给事件属性。

Listing 15-21. Cargo Events

// example-010/index-01.js

cargo.saturated = function () {

console.log('cargo is saturated at ' + cargo.length());

};

cargo.empty = function () {

console.log('cargo is empty; worker needs tasks');

};

cargo.drain = function () {

console.log('cargo is drained; no more tasks to handle');

};

Note

async.queue()async.cargo()都调度任务函数在事件循环的下一个节拍运行。如果项目被同步地一个接一个地添加到队列或货物中,那么每个项目的阈值将按预期被应用;队列将节流最大数量的工作人员,货物将划分最大数量的要处理的项目。但是,如果项是异步添加到每个任务中的,如果项是在事件循环的下一个直接循环之后添加的,则任务函数可能会在低于其最大容量的情况下被调用。

清单 15-22 中的代码从customerUpdates数组中取出每个更新,并将其推送到 cargo,然后将下一次推送安排在 500 毫秒后,在事件循环的下一次循环中发生。因为 cargo 会立即调度它的任务,所以UPDATE查询每次会运行一个——可能两个——更新,这取决于完成一个任务和调度下一个任务需要多长时间。

Listing 15-22. Adding Items to Cargo Asynchronously

// example-010/index-02.js

(function addUpdateAsync() {

if (!customerUpdates.length) return;

console.log('adding update');

var update = customerUpdates.shift();

cargo.push(update, function () {

console.log('done processing', update.id);

});

setTimeout(addUpdateAsync, 500);

}());

要保证队列和货物都满足最大阈值,请将项目同步推送到彼此。

摘要

本章介绍了一些常见的同步控制流,并演示了如何使用 Async.js 来为异步代码调整这些模式。表 15-1 显示了每个流程和相应的 Async.js 函数。

表 15-1。

Flows and Corresponding Async.js Functions

| 流动 | Async.js 函数 | | --- | --- | | 连续的 | async.series() | | 平行的 | async.parallel() | | 管道 | async.waterfall()async.seq() | | 环 | async.whilst() / async.doWhilst()async.until() / async.doUntil() | |   | async.retry()async.forever() | | 一批 | async.queue()async.cargo() |

顺序和并行流程允许开发人员执行多个独立的任务,然后根据需要汇总结果。管道流可用于将任务链接在一起,其中每个任务的输出成为后续任务的输入。为了将异步任务重复给定的次数,或者根据某些条件,可以使用循环流。最后,批处理流可以将数据分成块,一批接一批地异步处理。

通过巧妙地组织异步函数任务,协调每个任务的结果,并将错误和/或任务结果交付给最终回调,Async.js 帮助开发人员避免嵌套回调,并将传统的同步控制流操作带入 JavaScript 的异步世界。