【译】使用async/await玩函数式编程

06 May 2017

异步 async/await

Async/await使得像for循环,if表达式和try/catch等这样的块级命令结构可以很容易的结合异步行为。不同的是,它对功能结构的处理与forEachmapreducefilter等函数不同。async异步功能结构的行为是乎令人费解。 这篇文章,我将向你展示在JavaScript的内置数组函数封装为async异步函数时遇到的一些陷阱以及如何解决它。

注意:以下的代码只在Node v.7.6.0+版本测试通过,以下例子只供参考和学习。我不建议在生产中使用它。

动机和 forEach

forEach会同步的顺序的为数组的每一个元素都执行一次函数。例如,下面的JavaScript代码会打印[0-9]:

function print(n) {
  console.log(n);
}

function test() {
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(print);
}

test();

不幸的是,异步函数就变得微妙起来。以下JavaScript代码会反序输出[0-9]:

async function print(n) {
  // Wait 1 second before printing 0, 0.9 seconds before printing 1, etc.
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  // Will usually print 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 but order is not strictly
  // guaranteed.
  console.log(n);
}

async function test() {
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(print);
}

test();

尽管2个函数都是异步的,Node.js不会等到第一个print()执行完成后再去执行下一个! 可以就只使用一个await吗?看看效果:

async function test() {
  // SyntaxError: Unexpected identifier
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(n => { await print(n); });
}

不能像上面只使用一个await,不然你就是Star Fox,这样写有语法问题的,因为await必须在async当前代码作用域内。在这一点上,你可以放弃,改为使用非标准Promise.series()函数。假如你意识到async函数只是返回Promise函数,那么你可以在.reduce()中使用Promise的链式调用来实现一个顺序的forEach()

async function print(n) {
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  console.log(n);
}

async function test() {
  // This is where the magic happens. Each `print()` call returns a promise,
  // so calling `then()` chains them together in order and prints 0-9 in order.
  await [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].
    reduce((promise, n) => promise.then(() => print(n)), Promise.resolve());
}

test();

你完全可以把这个函数改成名为forEachAsync的函数:

async function print(n) {
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  console.log(n);
}

Array.prototype.forEachAsync = function(fn) {
  return this.reduce((promise, n) => promise.then(() => fn(n)), Promise.resolve());
};

async function test() {
  await [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEachAsync(print);
}

test();

map()filter()的链式调用

JavaScript有一个很大的优势那就是数组方法是可以链式调用的。下面的代码主要做的事是,根据你提供的id数组分别到数据库db1db2查询到你想要的对应id的文本内容,过滤掉db2数据库的部分,然后把db1剩下的部分保存到db2数据库。虽然希望你乎略业务功能,但是里面还是有很多的中间值。

const { MongoClient } = require('mongodb');

async function copy(ids, db1, db2) {
  // Find all docs from db1
  const fromDb1 = await db1.collection('Test').find({ _id: { $in: ids } }).sort({ _id: 1 }).toArray();
  // And db2
  const fromDb2 = await db2.collection('Test').find({ _id: { $in: ids } }).sort({ _id: 1 }).toArray();

  // Find all docs from db1 that aren't in db2
  const toInsert = [];
  for (const doc of fromDb1) {
    if (!fromDb2.find(_doc => _doc._id === doc._id)) {
      toInsert.push(doc);
      console.log('Insert', doc);
    }
  }
  // And insert all of them
  await db2.collection('Test').insertMany(toInsert);
}

async function test() {
  const db1 = await MongoClient.connect('mongodb://localhost:27017/db1');
  const db2 = await MongoClient.connect('mongodb://localhost:27017/db2');
  await db1.dropDatabase();
  await db2.dropDatabase();

  const docs = [
    { _id: 1 },
    { _id: 2 },
    { _id: 3 },
    { _id: 4 }
  ];
  await db1.collection('Test').insertMany(docs);
  // Only insert docs with _id 2 and 4 into db2
  await db2.collection('Test').insertMany(docs.filter(doc => doc._id % 2 === 0));

  await copy(docs.map(doc => doc._id), db1, db2);
}

test();

函数体希望做到尽可能的干净——你只需要这样做ids.map().filter().forEach(),但是map(),filter()each()中的任何一个都需要封装为异步函数。我们上面已经实现过forEachAsync(),照葫芦画瓢,实现mapAsync()filterAsync()应该不会很难。

Array.prototype.mapAsync = function(fn) {
  return Promise.all(this.map(fn));
};

Array.prototype.filterAsync = function(fn) {
  return this.mapAsync(fn).then(_arr => this.filter((v, i) => !!_arr[i]));
};

然而,链式调用却会出现问题。你怎么同时链式调用mapAsync()filterAsync()?你可能会考虑用then(),但是这样调用不够整洁。相反,你应该创建一个AsyncArray的类并且接受和保存一个Promise实例,这个Promise实例最终会返回一个数组。并且在这个类添加上我们创建的mapAsync,filterAsyncforEachAsync方法:

class AsyncArray {
  constructor(promise) {
    this.$promise = promise || Promise.resolve();
  }

  then(resolve, reject) {
    return new AsyncArray(this.$promise.then(resolve, reject));
  }

  catch(reject) {
    return this.then(null, reject);
  }

  mapAsync(fn) {
    return this.then(arr => Promise.all(arr.map(fn)));
  }

  filterAsync(fn) {
    return new AsyncArray(Promise.all([this, this.mapAsync(fn)]).then(([arr, _arr]) => arr.filter((v, i) => !!_arr[i])));
  }

  forEachAsync(fn) {
    return this.then(arr => arr.reduce((promise, n) => promise.then(() => fn(n)), Promise.resolve()));
  }
}

通过使用AsyncArray,就可以链式的调用mapAsync(), filterAsync()forEachAsync(),因为每个方法都会返回AsyncArray本身。现在我们再来看看上面的例子的另一种实现:

async function copy(ids, db1, db2) {
  new AsyncArray(Promise.resolve(ids)).
    mapAsync(function(_id) {
      return db1.collection('Test').findOne({ _id });
    }).
    filterAsync(async function(doc) {
      const _doc = await db2.collection('Test').findOne({ _id: doc._id });
      return !_doc;
    }).
    forEachAsync(async function(doc) {
      console.log('Insert', doc);
      await db2.collection('Test').insertOne(doc);
    }).
    catch(error => console.error(error));
}

async function test() {
  const db1 = await MongoClient.connect('mongodb://localhost:27017/db1');
  const db2 = await MongoClient.connect('mongodb://localhost:27017/db2');
  await db1.dropDatabase();
  await db2.dropDatabase();

  const docs = [
    { _id: 1 },
    { _id: 2 },
    { _id: 3 },
    { _id: 4 }
  ];

  await db1.collection('Test').insertMany(docs);
  // Only insert docs with _id 2 and 4 into db2
  await db2.collection('Test').insertMany(docs.filter(doc => doc._id % 2 === 0));

  await copy(docs.map(doc => doc._id), db1, db2);
}

test();

封装 reduce()

现在我们已经封装了mapAsync(), filterAsync()forEachAsync(),为什么不以相同的方式实现reduceAsync()

reduceAsync(fn, initial) {
    return Promise.resolve(initial).then(cur => {
      return this.forEachAsync(async function(v, i) {
        cur = await fn(cur, v, i);
      }).then(() => cur);
    });
  }

看看reduceAsync()如何使用:

async function test() {
  const db = await MongoClient.connect('mongodb://localhost:27017/test');
  await db.dropDatabase();

  const docs = [
    { _id: 1, name: 'Axl' },
    { _id: 2, name: 'Slash' },
    { _id: 3, name: 'Duff' },
    { _id: 4, name: 'Izzy' },
    { _id: 5, name: 'Adler' }
  ];

  await db.collection('People').insertMany(docs);

  const ids = docs.map(doc => doc._id);

  const nameToId = await new AsyncArray(Promise.resolve(ids)).
    reduceAsync(async function (cur, _id) {
      const doc = await db.collection('People').findOne({ _id });
      cur[doc.name] = doc._id;
      return cur;
    }, {});
  console.log(nameToId);
}

test();

到这里,我们已经可以异步的使用map(),filter(),reduce()forEach()函数,但是需要自己进行封装函数并且里面的Promise调用链很复杂。我很期待,有一个人能写出一个Promise版的库来无缝操作数组。函数式编程使得同步操作数组变得清洁和优雅,通过链式调用省掉了很多不必要的中间值。添加帮助库,操作Promise版的数组确实有点让人兴奋。

Async/Await虽然用处非常大,但是如果你使用的是Node.js 4+或者是Node.js 6+ 长期稳定版(Node.js 8 延迟发布),引入co你仍然可以在使用类似的函数式编程模式中使用ES6 generator。如果你想深入研究co并且想自己写一个类似的库,你可以点击查看我写的这本书:《The 80/20 Guide to ES2015 Generators》

原文:http://thecodebarbarian.com/basic-functional-programming-with-async-await.html

译者:Jin

作者:Valeri Karpov