Node.js 异步流程控制模式

Node.js 平台上 continuation-passing style 和异步 API 是规范。一些在同步代码中很简单的操作在异步代码中可能就需要特别注意,比如:

  • 遍历一组文件
  • 按顺序执行一系列任务
  • 等待一组操作完成

这些都要求开发者们采用新的方法和技术,首先要避免错误的实现,然后还要考虑执行效率和代码可读性;还要避免陷入 callback hell。(如果你的代码一直在横着长,而不是竖着长,就有问题了)

使用模式和纪律来驯服回调。像 async 这样的流程控制库可以极大地简化流程控制的问题,同时 CPS 也不是实现异步 API 的唯一方法。还有 Promises 和 ECMAScript 6 generators,很强大,也很灵活。

异步编程的困难所在

JavaScript 大量使用闭包和异步函数的 in-place 定义让编程体验非常平滑顺畅,我们不需要跳到代码的其他地方再编写新的函数,但同时也损失了一定的模块化,可重用性以及可维护性,最终会导致回调嵌套不受控制地扩散,函数大小不断增长,代码组织也会越来越差。大多数情况下,回调并不是功能上需要的,更多的是跟纪律相关,而不是异步编程的问题。【异步编程,编程纪律,绝对是根代码风格有关,仔细研读 google 和 airbnb 的 js 风格】预先知道我们的代码会变得非常笨重,并且给我们准备了非常好的预案和足够多的解决方案,是新手和专家的区别。

创建一个简单的网络蜘蛛

a little web spider – 一个命令行小程序,接受一个 web URL 作为输入,下载其内容到本地文件。用到了两个 npm 依赖:

  • request: 提高 HTTP 调用效率的一个库
  • mkdirp: 递归创建目录的一个小功能

spider.js

var request = require('request');
var fs = require('fs');
var mkdirp = require('mkdirp');
var path = require('path');
var utilities = require('./utilities');

function spider(url, callback) {
    var filename = utilities.urlToFilename(url);
    fs.exists(filename, function(exists) {      //[1]
        if (!exists) {
            console.log('Downloading ' + url);
            request(url, function(err, response, body) {    //[2]
                if (err) {
                    callback(err);
                } else {
                    mkdirp(path.dirname(filename), function(err) {  //[3]
                        if (err) {
                            callback(err);
                        } else {
                            fs.writeFile(filename, body, function(err) {    //[4]
                                if(err) {
                                    callback(err);
                                } else {
                                    callback(null, filename, true);
                                }
                            });
                        }
                    });
                }
            });
        } else {
            callback(null, filename, false);
        }
    });
}

上述代码执行的主要任务:

  1. 通过验证相应的文件是否应经被创建来检查URL是否已经被下载过了 fs.exists(filename, function(exists) ...
  2. 如果文件没有找到,就使用下列代码行来下载 URL:request(url, function(err, response, body) ...
  3. 然后确认将要包含该文件的目录是否存在: mkdirp(path.dirname(filename), function(err)...
  4. 最后,我们把HTTP响应包体写到文件系统:fs.writeFile(filename, body, function(err)...

从命令行参数读取 URL 并调用我们的 spider() 函数:

spider(process.argv[2], function(err, filename, downloaded) {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log('Completed the download of "' + filename + '"');
    } else {
        console.log('"' + filename + '" was already downloaded');
    }
});

安装依赖

node install

运行:

node spider http://www.example.com

回调地狱

回头看一下我们刚才的代码,虽然我们的逻辑很简单,实现也很直观,但还是有很多层的缩进,而且阅读起来也比较困难。如果我们用同步阻塞API来实现的话,代码看起来会直观很多。

callback hell可能是 node.js 里面最被大家熟知的反模式之一了。受这个问题影响的代码结构看起来像是下面这样:

asyncFoo(function(err) {
    asyncBar(function(err) {
        asyncFooBar(function(err) {
            [...]
        });
    });
});

形状看起来像是金字塔,因此也被口语化地称为 pyramid of doom。

上面这样的代码最明显的问题是可读性,最后你往往分不出一个函数是从哪里开始的,又是从哪里结束的。这种重叠引起的另一个问题是在每一个作用域中使用的变量名。通常情况下,我们会使用相似的或者是一致的名称来描述一个变量的内容。最好的例子就是由每一个回调所接受的 error 参数。有些人呢,就在不同的作用域里面使用相同名字的变种来区分 — 例如,err, error, err1, err2等等;还有的人就总是用相同的名字来隐藏作用域中变量的定义,例如只用 err。两种方法都不完美,都容易造成混淆以及增加引入错误的可能性。

闭包会带来性能上的代价和内存消耗。

注意:有关闭包是如何在 V8 中运行的一个非常棒的介绍,Vyacheslav Egorov ,Google 的一个 V8 工程师写的一篇博客:http://mrale.ph/blog/2012/09/23/grokking-v8-closures-for-fun.html

我们刚才的 spider() 函数就是一个回调地狱:不容易阅读,分不清函数的起止,变量名跨作用域混乱,…下面我们继续看一下有哪些方法来修复这个问题。

使用纯 JavaScript

回调地狱只是写异步代码会碰到的问题之一。比如在一个集合上遍历并按顺序应用一个异步操作,并不像在数组上调用一个 forEach() 那么简单,它实际上要求的是类似回调的技术。我们除了避免回调地狱之外,还有学习一些常见的控制流模式,只使用简单的纯 JavaScript。

回调纪律

写异步代码的第一条原则就是要记住在定义回调的时候不要滥用闭包。大多数情况下,修复回调地狱并不需要任何库,也不需要多么高级的技术,或者范式的改变,而只需要一些常识即可。

这里是一些基本原则,帮助我们保持嵌套层次在一个比较低的水平,同时改进我们的代码组织:

  • 尽早退出。根据上下文使用 return, continue, 或者 break,而不是写完整的 if/else 语句。这将帮助保持代码在一个比较浅的水平。
  • 为回调创建具名函数,把它们放在回调之外,并把中间结果作为参数传入。给函数命名也将有助于堆栈调试。
  • 代码模块化。把代码切成尽量小的,可以重用的函数。

使用回调纪律

第一步,去掉 else 语句,收到 error 之后马上从函数返回。从这样的代码:

if (err) {
    callback(err);
} else {
    // code to execute when there are no errors
}

改进到:

if (err) {
    return callback(err);
}
// code to execute when there are no errors

这样就减少了一些嵌套的层次,同时也不需要复杂的重构。

注意:一个常见的错误是忘记返回只调用:callback(err),而不是 return callback(err)。

第二个优化,甄别可以重用的代码。我们把写字符串到文件重构为一个单独的函数:

function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), function(err) {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

依据相同的原则,我们可以创建一个名为 download() 的通用函数,接受一个 URL 和文件名作为输入,把 URL 下载到给定的文件中。在内部我们可以使用刚创建的 saveFile() 函数:

function download(url, filename, callback) {
    console.log("Downloading " + url);
    request(url, function(err, response, body) {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, function(err) {
            console.log('Downloaded and saved: ' + url);
            if (err) {
                return callback(err);
            }
            callback(null, body)
        });
    });
}

最后一步,修改 spider() 函数,感谢我们前面的这些修改,这个函数现在看起来像是这样了:

function spider(url, callback) {
    var filename = utilities.urlToFilename(url);
    fs.exists(filename, function(exists) {
        if (exists) {
            return callback(null, filename, false);
        }
        download(url, filename, function(err) {
            if (err) {
                return callback(err);
            }
            callback(null, filename, true);
        })
    });
}

功能和接口都保持没变,唯一改变的就是我们组织代码的方式。通过应用我们刚才讨论的这些基本原则,我们减少了嵌套,同时增加了代码的可读性和可测试性。总而言之,记住不要滥用闭包和匿名函数。

顺序执行

从分析 sequential execution flow 开始探索异步控制流程模式。

顺序执行一组任务意味着每次运行一个,一个接着一个。执行顺序是有关系的,而且必须保持,因为列表中任务的结果可能会影响后面任务的执行。这种流程有几个变种:

  • 顺序执行一组已知的任务,不需要链式也不需要传播结果
  • 使用一个任务的输出作为下一项任务的输入(通常也被称为 chain, pipline 或者 waterfall)
  • 在一个集合上迭代,同时在每个元素上运行一个异步任务,一个完成之后再下一个

顺序执行,在使用异步 CPS 的时候,通常是callback hell问题的主要原因。

顺序地执行一组已知任务

根据前面的示例,使用下面的模式来归纳一下解决方案:

function task1(callback) {
    asyncOperation(function() {
        task2(callback);
    });
}

function task2(callback) {
    asyncOperation(function(result) {
        task3(callback);
    });
}

function task3(callback) {
    asyncOperation(function() {
        callback();
    });
}

task1(function() {
    // task1, task2, task3 completed
});

顺序性迭代

刚讨论的模式只在我们事先知道有多少任务要执行的情况下会工作良好。这让我们可以按顺序硬编码下一个任务的调用;但是如果我们想在一个集合的每一个元素上执行一个异步操作怎么办?这种情况就没办法硬编码了,我们必须得动态构建。

网络蜘蛛版本 2

为了演示顺序迭代,我们给 web spider 引入一个新功能。我们现在希望循环下载包含在页面中的全部链接。我们将从页面中提取全部链接,然后在每一个上面循环调用网络蜘蛛,并且是按顺序的。

第一步是修改我们的 spider() 函数让它可以触发一个循环下载我们页面全部链接,通过使用一个名为 spiderLinks() 的函数。

还有,我们现在不在检查一个文件是否已经存在,而是尝试去读取它,然后开始爬它的链接;通过这种方式,我们可以继续打断的下载。最后一项修改,我们传递一个新的参数,nesting,用来帮助我们限定循环的深度,最终代码如下:

function spider(url, nesting, callback) {
    var filename = utilities.urlToFilename(url);
    fs.readFile(filename, 'utf8', function(err, body) {
        if(err) {
            if(err.code !== 'ENOENT') {
                return callback(err);
            }

            return download(url, filename, function(err, body) {
                if(err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }

        spiderLinks(url, body, nesting, callback);
    });
}

链接的顺序爬取

现在我们可以创建这个版本的爬虫的核心了,spiderLinks() 函数,使用顺序异步迭代算法下载一个 HTML 页面中的全部链接。

function spiderLinks(currentUrl, body, nesting, callback) {
    if(nesting === 0) {
        return process.nextTick(callback);
    }

    var links = utilities.getPageLinks(currentUrl, body);   //[1]
    function iterate(index) {       //[2]
        if(index === links.length) {
            return callback();
        }
        spider(links[index], nesting - 1, function(err) {   //[3]
            if (err) {
                return callback(err);
            }
            iterate(index + 1);
        });
    }
    iterate(0);     //[4]
}

新函数中比较重要的几步:

  1. 使用 utilities.getPageLinks() 函数获得包含在该页面中的全部链接的列表。这个函数只返回指向内部目标的链接(同一个域)
  2. 我们使用一个名为 iterate() 的本地函数迭代 links 集合,它接受下一个要进行分析的链接的 index。在此函数中,我们做的第一件事是检查索引是否等于 links 数组的长度,如果是的话,我们立马调用 callback() 函数,因为这意味着我们处理完全部项目了。
  3. 到这个时候,处理链接的准备工作都完成了。我们通过减少 nesting 级别来调用 spider() 函数,并且在操作完成时调用迭代的下一步。
  4. 作为 spiderLinks() 函数的最后一个步骤,我们通过调用 iterate(0) 来启动整个迭代过程。

我们刚才演示的算法可以让我们在一个数组上迭代,顺序执行一个异步操作,在这里就是 spider() 函数。

模式

spiderLinks() 函数说明迭代一个集合的同时对其应用一个异步操作是可能的。顺序异步迭代一个集合的该模式可以归纳如下:

function iterate(index) {
    if(index === tasks.length) {
        return finish();
    }
    var task = tasks[index];
    task(function() {
        iterate(index + 1);
    });
}

function finish() {
    // iteration completed
}

iterate(0);

我们可以上面的算法为基础实现 map 和 reduce 算法。(好像跟 underscore.js 很像啊)我们可以把它封装成一个如下的具有签名的函数:

iterateSeries(collection, iteratorCallback, finalCallback)

这个可以当做一个练习。

Notes: 模式(顺序迭代):通过创建一个名为 iterator 的函数顺序执行一列任务,该函数调用集合中下一个有效的任务,并确保在当前任务结束时调用迭代的下一步。

并行执行

有些情况下,一组异步任务的执行顺序并不重要,我们只需要的全部任务完成以后通知我们即可。这种情况更适合由并行执行流。

网络蜘蛛版本3

网络蜘蛛更适合并行下载。修改网络蜘蛛 spiderLinks(),让所有下载一下子全都开始(技巧主要是在回调函数 done 里面):

function spiderLinks(currentUrl, body, nesting, callback) {
    if(nesting === 0) {
        return process.nextTick(callback);
    }

    var links = utilities.getPageLinks(currentUrl, body);
    if(links.length === 0) {
        return process.nextTick(callback);
    }

    var completed = 0, errored = false;

    function done(err) {
        if (err) {
            errored = true;
            return callback(err);
        }
        if(++completed === links.length && !errored) {
            return callback();
        }
    }

    links.forEach(function(link) {
        spider(link, nesting - 1, done);
    });
}

spider() 任务现在是一下子全都开始,这可以通过简单地遍历 links 数组然后开始每一个任务开始。

links.forEach(function(link) {
    spider(link, nesting - 1, done);
});

让我们的应用等待全部任务都完成的技巧就在于给 spider() 函数提供了一个特殊的回调,我们叫做 done()。当一个 spider 任务完成的时候,done() 函数就会增加一个计数器。当下载完成的次数到达 links 数组的大小时,最终的callback 会被调用:

function done(err) {
    if (err) {
        errored = true;
        return callback(err);
    }
    if(++completed === links.length && !errored) {
        callback();
    }
}

模式

并行模式归纳:

var tasks = [...];
var completed = 0;
tasks.forEach(function(task) {
    task(function() {
        if (++completed === tasks.length) {
            finish();
        }
    });
});

function finish() {
    // all the tasks completed
}

稍微修改一下,我们就可以实现把每个任务的结果积累起来,来过滤或者映射一个数组的元素,或者一个给定数量的任务完成就来调用 finishe() 回调。【这部分到底算是函数式编程,还是怎么着?】最后一种情形被称为 competitive race。

模式(不受限的并行执行):通过立即全部派生来并行执行一组异步任务,然后通过技术它们的回调被调用的次数来等待所有任务完成。

在并发任务出现时纠正竞争状态(race conditions)

Node.js 的单线程并不意味着我们并不会有竞争状态,相反还会很普遍,主要是因为异步操作调用和他的结果通知之间的延迟。举个例子,我们最新版本的网络蜘蛛里面就有一个竞争状态。假如两个 spider 任务同时操作同一个 URL,可能就会在同一个文件上调用 fs.readFile(),一个没有找到开始下载去了,在这期间另一个也检查说没有文件,也去下载去了。

怎么修复呢?答案很简单,我们用一个数组变量做标记,动手之前先检查一下标识:

var spidering = {};
function spider(url, nesting, callback) {
    if (spidering[url]) {
        return process.nextTick(callback);
    }
    spidering[url] = true;
    [...]
}

竞争条件在单线程中也会引起很多问题,并且很难调试,运行并行任务的时候要仔细检查这种类型的情况。

受限的并发执行

实际上,并发一般都是受限的,好的工作流程应该是:

  • 初始化的时候,我们派生出不超过并发数限制的那么多任务
  • 每次一个任务完成的时候,再派生出一个或多个任务直到没有再次达到限制

限制并发

现在我们来提供一种模式使用受限的并发来执行一套给定的任务:

var tasks = [...];
var concurrency = 2, running = 0, completed = 0, index = 0;
function next() {   //[1]
    while(running < concurrency && index < tasks.length) {
        task = tasks[index++];
        task(function() {   //[2]
            if(completed === tasks.length) {
                return finish();
            }
            completed++, running--;
            next();
        });
        running++;
    }
}
next();

function finish() {
    // all tasks finished
}

这可以被认为是一个顺序执行和并发执行的混合。 任务没有全部完成,且运行数少于限制数的情况下,派生一个任务,每派生一个,运行数+1;有任务完成的时候,完成数+1,运行数-1,如果完成数等于任务总数,全部任务完成;循环以上

【这里面有一个问题,cpu 会很高吧?】

全局性地限制并发

注意 node.js 本身的 maxsockets 数量限制。0.11版本及之后的不同。

我们可以把刚才的限定模式应用到 spiderLinks() 上,但是问题是这里的限定是针对一个单一页面的。因为我们可以下载多个页面,所以每个页面又会派生出另外两个下载,导致全部加起来的下载数很多。

使用队列进行拯救

我们真正想要的是能够限制并行运行的全部数量,我们可以稍微修改一下之前的例子,但是这里我们决定引入一个新的机制 queues,来限制多个任务的并发。

TaskQueue 组合了一个队列和之前演示的算法。先从定义一个下面这样的构造器开始:

function TaskQueue(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
}

这个构造器的参数只有并发数限制,但是在此之外,它初始化了其他我们稍后需要的实例变量。

实现 pushTask() 方法:

TaskQueue.prototype.pushTask = function(task, callback) {
    this.queue.push(task);
    this.next();
}

上面函数简单地添加了一个新的任务到队列然后通过调用 this.next() 启动了工作者的运行。

next() 方法的角色是从队列派生一组任务,但不超过并发限制:

TaskQueue.prototype.next = function() {
    var self = this;
    while(self.running < self.concurrency && self.queue.length) {
        var task = self.queue.shift();
        task(function(err) {
            self.running--;
            self.next();
        });
        self.running++;
    }
}

这里面有两个好处,一个是我们可以在运行时动态修改并发限制数,另一个是我们现在有了一个共享的集中式限制并发实体,可以在所有函数运行中共享。

网络蜘蛛版本 4

加载依赖,创建 TaskQueue 实例:

var TaskQueue = require('./taskQueue');
var downloadQueue = new TaskQueue(2);

更新 spiderLinks() 函数:

function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    var links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }

    var completed = 0, errored = fasle;
    links.forEach(function(link) {
        downloadQueue.pushTask(function(done) {
            spider(link, nesting - 1, function(err) {
                if (err) {
                    errored = true;
                    return callback(err);
                }
                if (++completed === links.length && !errored) {
                    callback();
                }
                done();
            });
        });
    });
}
  • We run the spider() function by providing a custom callback.
  • In the callback, we check if all the tasks relative to this execution of the spiderLinks() function are completed. When this condition is true, we invoke the final callback of the spiderLinks() function.
  • At the end of our task, we invoke the done() callback so that the queue can continue its execution.

async 库

我们目前为止分析的这些控制流程都可以作为一个基础,来构建一个可以重用的,并且更通用的解决方案。我们可以把顺序执行,顺序执行并把上一步的输出作为下一步的输入,顺序执行进行map/reduce,并行执行,限定并发数的并行执行等这些算法都封装成可以重用的函数,这正是 async(https://npmjs.org/package/async)库所做的工作。async 在 Node.js 里面非常的流行,不仅提供了一组简化任务执行控制流的函数,还可以对集合进行处理。

顺序执行

async 提供的方法太多,因此针对要解决的问题挑选正确的方法反而成了一个问题,需要一定的经验和实践。我们从网络蜘蛛版本2来改造成使用 async。首先我们先安装一下 async:

npm install async	

然后我们从 spider.js 模块中加载一个新的依赖:

var async = require('async');

顺序执行已知的一组任务

我们先来修改 download() 函数。我们知道它主要是顺序执行下面三个任务:

  1. 下载一个 URL 的内容。
  2. 如果目录不存在的话则创建一个新的。
  3. 把 URl 的内容保存到文件里面。

async 中最适合的函数是 async.series(),签名如下:

async.series(tasks, [callback])

它接受一个任务列表和一个回调函数,在全部任务结束后会调用该回调函数。每个任务都是一个接受一个回调函数的函数,在任务完成后必须调用该回调函数:

function task(callback) {}

async 使用跟 Node.js 相同约定的回调,它会自动处理错误的传递。所以,如果任何一个任务调用其回调函数时如果带着一个错误,async 将忽略列表中剩下的任务而直接跳到最后一个回调。

根据这种思想,我们看一下使用 async 的 download() 函数怎么改变的:

function download(url, filename, callback) {
	console.log('Downloading' + url );
	var body;
	
	async.series([
		function(callback) {	//[1]
			request(url, function(err, response, resBody) {
				if (err) {
					return callback(err);
				}
				body = resBody;
				callback();
			});
		},
		mkdirp.bind(null, path.dirname(filename)),	//[2]
		function(callback) {	//[3]
			fs.writeFile(filename, body, callback);
		}
	], function(err) {	//[4]
		console.log('Downloaded and saved: ' + url);
		if (err) {
			return callback(err);
		}
		callback(null, body);
	});
}

使用 async 我们不再需要嵌套 callback,只需要提供一组扁平化的任务列表即可,通常每个任务就是一个异步操作,async 将会顺序执行它们,这样就不会有 callback hell 了。我们是这样定义每一个任务的:

  1. 第一个任务参与的是 URl 的下载。同样我么还把响应包体保存到了一个闭包变量(body)里面,这样我们就可以在和其他任务共享了。
  2. 在第二个任务里面,我们希望创建一个承载下载页面的目录。我们通过mkdirp()函数的部分应用来实现,绑定了要创建的目录的路径,我们能省一些代码行并且增加了它的可读性。
  3. 最后,我们把下载的 URL 的内容写到了一个文件。这种情况下我们不能像任务2那样执行部分应用了,因为 body 只有在第一个任务执行完成之后才有效。然而我们仍然可以节省一些代码行,通过发挥 async 的自动化错误管理,通过简单地传入任务回调给 fs.writeFile() 函数。
  4. 所有任务都完成以后,async.series() 的最终的回调将被调用。在我们这个例子里,我们只是简单做了一些错误管理,然后就把 body 变量返回给 download() 函数的回调了。

我们在开发过程中顺序执行碰到很多的还有一种就是需要把上一步的输出作为下一步的输入,这时候可以使用 async.waterfall()。在我们这个例子中,我们可以使用该功能来传递 body 变量直到序列结束。这个可以作为我们的一个练习,然后看一下区别。

顺序迭代

我们可以使用 async 的 async.eachSeries() 来改造我们的 spiderLinks() 函数,也就是在集合上进行迭代的方法,如下:

function spiderLinks(currentUrl, body, nesting, callback) {
	if (nesting === 0) {
		return process.nextTick(callback);
	}
	
	var links = utilities.getPageLinks(currentUrl, body);
	if (links.length === 0) {
		return process.nextTick(callback);
	}
	
	async.eachSeries(links, function(link, callback) {
		spider(link, nesting - 1, callback);
	}, callback);
}

和我们自己使用纯 JavaScript 实现的同样的函数相比,我们能够注意到 async 给我们带来的在代码组织和可读性方面巨大的益处。

并行执行

我们使用 async 的函数来改造我们的网络蜘蛛版本3,也就是无受限并行流程。用 async.each:

function spiderLinks(currentUrl, body, nesting, callback) {
	[...]
	async.each(links, function(link, callback) {
		spider(link, nesting - 1, callback);
	}, callback);
}

代码跟我们的顺序版本是一样的,我们只是把 async.eachSeries() 换成了 async.each()。也就是说我们的重点放在应用程序逻辑即可,控制流程的工作可以都交给 async 来做。

受限的并行执行

受限类的 async 函数,eachLimit(), mapLimit(), parallelLimit(), queue() 和 cargo()。async 的 async.queue() 函数工作方式跟 TaskQueue 类很类似:

var q = async.queue(worker, concurrency);

worker() 函数接受一个 task 来运行,以及一个 callback 来调用,在任务完成的时候:

function worker(task, callback)

往队列里面添加新任务使用 q.push(task, callback)。跟一个任务相关的 callback 必须被 worker 回调,在任务处理完成之后。我们使用 async.queue() 实现版本4。首先创建一个新的队列:

var downloadQueue = async.queue(function(taskData, callback) {
	spider(taskData.link, taskData.nesting - 1, callback);
}, 2);

代码是比较直观的,我们创建了一个新的队列,设置并发限制为2,还有一个简单地调用我们的 spider() 函数的 worker,采用一个跟任务相关联的数据。然后,我们实现 spiderLinks() 函数:

function spiderLinks(currentUrl, body, nesting, callback) {
	if (nesting === 0) {
		return process.nextTick(callback);
	}
	var links = utilities.getPageLinks(currentUrl, body);
	if (links.length === 0) {
		return process.nextTick(callback);
	}
	var completed = 0, errored = false;
	links.forEach(function(link) {
		var taskData = {link: link, nesting: nesting};
		downloadQueue.push(taskData, function(err) {
			if (err) {
				errored = true;
				return callback(err);
			}
			if (++completed === links.length && !errored) {
				callback();
			}
		});
	});
}

使用 async 我们避免了从零开始写异步控制流程模式,从而可以为我们节省大量的时间和精力,还有代码行数。

Promises

CPS 并不是编写异步代码的唯一方法,promises 及其实现,遵照 Promises/A+ 规范(https://promisesaplus.com)。

什么是 promise?

promises 是允许异步函数返回称为一个 promise 对象的抽象,它代表的是该操作的最终结果。用 promises 的话来说,如果该异步操作尚未完成,我们就说一个 promise 是 pending 的,当操作成功结束的时候我们说它是 fulfilled 的,并且当操作随着一个错误终止的时候我们说它是 rejected 的。一旦一个 promise 是 fulfilled 或者是 rejected,都被认为是 settled。

要接受 fufillment 的值或者跟 rejection 相关连的error(reason)的话,可以使用promise的 then() 方法。下面是它的签名:

promise.then([onFulfilled], [onRejection])

这里的 onFulfilled() 是一个最终将接受promise的 fulfillment 值的函数,而 onRejected() 是另一个将接受 rejection 原因的函数。两个都是可选的。要想了解 Promise 会如何转换我们的代码,让我们考虑下面的例子:

asyncOperation(arg, function(err, result) {
	if (err) {
		// handle error
	}
	// do stuff with result
});

Promise 能够转换这种典型的 CPS 代码到一种更好的结构和更优雅的代码,如下所示:

asyncOperation(arg)
	.then(function(result) {
		// do stuff with result
	}, function(err) {
		// handle error
	});

then() 方法一个至关重要的属性是它同步返回另一个 promise。如果任何一个 onFulfilled() 或者 onRejected() 方法返回一个值 x,那么由 then() 方法返回的 promise 将按照下面这样:

  • 如果 x 是一个值的话,则带着 x Fulfill
  • 如果 x 是一个 promise 或者一个 thenable 的话,使用 x 的 fulfillment 值进行 Fulfill
  • 如果 x 是一个 promise 或者一个 thenable 的话,使用 x 的最终 rejection reason 来 Reject

Notes: 一个 thenable 是一个跟 promise 类似的对象,也有 then() 方法。这个词别用于表示在正在使用的特定 promise 实现外面的一个 promise。

这个功能能让我们构建 promise 链,能够更容易地在几个配置中聚合和安排异步操作。同样,如果我们没有指定一个 onFulfilled() 或者 onRejected() 句柄,那么 fulfillment 的值或者 rejection 的原因会被自动前传到链中的下一个promise。这能够让我们跨整个链中自动传递错误知道由一个 onRejected() 句柄捕获。使用 promise 链,顺序执行的任务突然就变成了一个很平常的操作:

asyncOperation(arg)
	.then(function(result1) {
		// returns another promise
		return asyncOperation(arg2);
	})
	.then(function(result2) {
		// return a value
		return 'done';
	})
	.then(undefined, function(err) {
		// any error in the chain is caught here
	});

promises 的另一个重要属性是 onFulfilled() 和 onRejected() 函数都被确保被异步调用。最棒的地方在于如果一个异常被从 onFulfilled() 或者 onRejected() 处理器中抛出,由 then() 方法返回的 promise 会使用这个异常作为 rejection reason 自动拒绝。这个比 CPS 有优势多了,因为这意味着使用 promise,异常将自动跨链传递。

Note: 有关 Promise/A+ 规范的详细描述,参见官方网站 http://promises-aplus.github.io/promises-spec/

Promises/A+ 实现

下面是一些最常见的 Promises/A+ 实现:

它们之间真正的区别是它们在规范之上提供的额外功能。规范实际只定义了 then() 方法的功能以及 promise 的解析过程,并没有指定其他功能。例如,一个 promise 是如何从一个基于回调的异步函数创建的。

在我们的示例中,我们尝试使用一套由 ES6 promises 实现的 API。因为它们将变成 JavaScript 内置的无需任何额外的库进行支持。幸运的是,上面这些库都平滑地适配支持 ES6 API。

注意:现在由 Node.js 所使用 V8 尚未原生支持 promise.

作为参考,现在由 ES6 promises 所提供的 API 列表:

  • Constructor(new Promise(function(resolve, reject) {})): 创建一个新的 promise,fulfill 或 reject 基于作为一个参数传给它的函数的行为。构造器的参数解释如下:
    • resolve(obj):使用一个 fulfillment 值来决定 promise,这个值将是 obj,如果 obj 是一个值的话。如果 obj 是一个 promise 或者 thenable 的话,它将是 obj 的 fulfilemnt 值。
    • reject(err):使用原因 err 拒绝 promise。通常是让 err 成为 Error 的一个实例。
  • Promise 对象的静态方法:
    • Promise.resolve(obj): 从一个 thenable 或者一个值来创建一个新的 promise
    • Promise.reject(err): 创建一个使用 err 作为原因进行拒绝的 promise
    • Promise.all(array): 创建一个 promise,使用一个 fulfillment 值数组进行 fulfill,当数组中所有项目都 fulfill 时,并且在任何一个项目 reject 时,使用第一个 rejection reason 进行 reject。数组中的每一个项目都可以是一个 promise, 一个普通的 thenable 或者一个值
  • 一个 Promise 实例的方法:
    • promise.then(onFulfilled, onRejected):这是 promise 最基本的方法。跟之前描述的规范相符。
    • promise.catch(onRejected): 跟 promise.then(undefined, onRejected) 句法不同而已

Notes: jQuery 和 Q 实现的 then 好像都是叫做 deferreds。

Promisifying 一个 Node.js 风格的函数

只有少量的库自身提供 promise,多数情况下,我们要把一个典型的基于回调的函数转换成能够返回一个 promise,这也被称做 promisification。

使用 Promise 对象的构造器。我们创建一个新的名为 promisify() 的函数并把它包含进 utilities.js 模块中:

var Promise = require('bluebird');

module.exports.promisify = function(callbackBasedApi) {
	return function promisified() {
		var args = [].slice.call(arguments);
		return new Promise(function(resolve, reject) {	//[1]
			args.push(function(err, result) {	//[2]
				if (err) {
					return reject(err);		//[3]
				}
				if (arguments.length <= 2) {	//[4]
					resolve(result);
				} else {
					resolve([].slice.call(arguments, 1));
				}
			});
			callbackBasedApi.apply(null, args);	//[5]
		});
	}
};

上面的函数返回了另一个叫做 promisified() 的函数,代表的是作为输入给出的 callbackBasedApi 的 promisified 版本。主要做了下列工作:

  1. promisified() 函数创建了一个新的 promise,使用 Promise 构造器,并且立即把它返回给调用者。
  2. 在传给 Promise 构造器的函数中,我们确保传给 callbackBasedApi 一个特别的回调。因为我们都知道回调总是最后一个,所以我们就简单地把它附加到提供给 promisified() 函数的参数列表中。
  3. 在这个特殊回调中,如果我们接收到一个错误,我们就立马拒绝 promise。
  4. 如果没有错误,我么使用一个值或者一个值的数组来决定 promise,取决于有多少结果传给了回调
  5. 最后,我们简单地调用 callbackBasedApi 使用我们已经构建的参数列表。

Notes:大多数 promise 的实现已经提供了一些帮助类来转换一个 Node.js 风格的 API 到返回一个 promise。比如,Q 有 Q.denodeify() 和 Q.nbind(),Bluebird 有 Promise.promisify(),以及 When.js 的 node.lift()。

顺序执行

我们直接把版本2,顺序下载一个页面中的链接,转换成使用 promise。

在 spider.js 模块中,第一步我们先加载我们的 promises 实现并且 promisify 我们打算用的回调函数:

var Promise = require('bluebird');
var utilities = require('./utilities');

var request = utilities.promisify(require('request'));
var mkdirp = utilities.promisify(require('mkdirp'));
var fs = require('fs');
var readFile = utilities.promisify(fs.readFile);
var writeFile = utilities.promisify(fs.writeFile);

现在我们可以开始转换 download() 函数了:

function download(url, filename) {
	console.log('Downloading ' + url);
	var body;
	return request(url)
		.then(function(results) {
			body = results[1];
			return mkdirp(path.dirname(filename));
		})
		.then(function() {
			return writeFile(filename, body);
		})
		.then(function() {
			console.log('Downloading and saved: ' + url);
			return body;
		});
}

这样的代码很优雅,调用者只有在操作(request, mkdirp, writeFile)全部完成以后才会收到一个 fulfill body 的一个 promise。现在开始修改 spider() 函数:

function spider(url, nesting) {
	var filename = utilities.urlToFilename(url);
	return readFile(filename, 'utf8)
		.then(
			function(body) {
				return spiderLinks(url, body, nesting);
			},
			function(err) {
				if (err.code !== 'ENOENT') {
					throw err;
				}
				
				return download(url, filename)
					.then(function(body) {
						return spiderLinks(url, body, nesting);
					});
				}
			);
		}

要注意的是我们还给 readFile 的 promise 传了一个 onRejected() 函数,来处理页面还没有被下载(文件找不到)的情况。现在我们同样来修改 spider() 函数的主要调用如下:

spider(process.argv[2], 1)
	.then(function() {
		console.log('Download complete');
	})
	.catch(function(err) {
		console.log(err);
	});

注意我们第一次使用 catch 来处理源自 spider 的错误情况。现在还只剩下 spiderLinks() 函数了。

使用 promise 相对 CPS 节省了大量错误传递的逻辑。

顺序迭代器

截至目前我们都在说 promise 是什么,使用 promise 实现一组顺序执行的任务有多优雅多简单,下面我们来看一下怎么用 promise 实现一个迭代。也就是我们要用 promise 来修改一下我们版本2的 spiderLinks() 函数:

function spiderLinks(currentUrl, body, nesting) {
	var promise = Promise.resolve();	//[1]
	if (nesting === 0) {
		return promise;
	}
	var links = utilities.getPageLinks(currentUrl, body);
	links.forEach(function(link) {	//[2]
		promise = promise.then(function() {
			return spider(link, nesting - 1);
		});
	});
	
	return promise;
}

要异步迭代一个 web 页面的全部链接,我们需要动态绑定一个 promises 链:

  1. 首先,我们定义了一个空的 promise, 决定到 undefined。该 promise 只是用来作为构建我们链的一个起点。
  2. 然后,在循环里面,我们使用一个新的获取自在链中前一个 promise 上调用 then() 得到的 promise 来更新 promise 变量。这就是我们实际的使用 promise 的异步迭代模式。

使用这种方式,循环的最后,该 promise 变量将包含循环中最后一个 then() 调用,所以它将在链中的所有 promise 都决定之后才决定。

顺序迭代器 – 模式

我们提炼一下顺序迭代一组 promise 的模式:

var tasks = [...]
var promise = Promise.resolve();
tasks.forEach(function(task) {
	promise = promise.then(function() {
		return task();
	});
});

promise.then(function() {
	// All tasks completed
});

替代使用 forEach() 循环的另一种方法是使用 reduce(),代码看起来更加紧凑:

var tasks = [...]
var promise = tasks.reduce(function(prev, task) {
	return prev.then(function() {
		return task();
	});
}, Promise.resolve());

promise.then(function() {
	// All tasks completed
});

Pattern(sequential iteration with promises):使用一个循环动态构建一个promise的链。

并行执行

另一个使用 promise 后变得轻松的是并行执行流程。我们要做的只是使用内置的 Promise.all() 来创建另一个 promise。我们更新一下版本3的 spiderLinks() 函数,使用 promise:

function spiderLinks(currentUrl, body, nesting) {
	if (nesting === 0) {
		return Promise.resolve();
	}
	var links = utilities.getPageLinks(currentUrl, body);
	var promises = links.map(function(link) {
		return spider(link, nesting - 1);
	});
	
	return Promise.all(promises);
}

在 elements.map() 上全部开始 spider() 任务,收集它们全部的 promises。我们使用 Promise.all() 方法,它在数组中的全部 promise fulfill 之后被 fulfill。换句话说,它在全部下载任务完成之后再被调用,正是我们所希望的那样。

受限的并行执行

ES6 Promise API 本身没有提供一种受限执行的控制流,但是我们可以把之前用纯 JavaScript 实现的思想借鉴过来。 // 本部分剩余代码(内容)待补充,from page 98(117)

Generators

也被称为 semi-coroutines,ES 6 规范引入的另一种能够简化异步控制流程代码的机制。generator 跟函数类似,但是可以被挂起,通过 yield 语句,然后在后面可以被恢复,尤其在实现迭代器时非常有用。

注意:在 node.js 里面,generator 从 0.11 开始有效,但是默认关闭,需要通过 –harmony 或者 –harmony-generators 标记来让 generators 工作。

基础

generator 函数的语法,在 function 关键词后面加一个*号:

function* makeGenerator() {
	// body
}

在 makeGenerator() 函数内部,我们可以使用关键词 yield 来暂停执行并返回给调用者传给它的值:

function* makeGenerator() {
	yield 'Hello World';
	console.log('Re-entered');
}

一个简单的示例

生成器作为迭代器

把值传回生成器

使用生成器异步控制流

使用 co: 基于生成器的控制流

顺序执行

并行执行

受限的并行执行

对比

已经提供了几种异步控制流程模式的对比了。

小结

回复