koa源码解析

深入理解中间件和洋葱模型

Posted by Li Yucang on June 9, 2023

koa源码解析

koa是什么

koa 是由 Express 原班人马打造的,致力于成为一个更小、更富有表现力、更健壮的 Web 框架。使用 koa 编写 web 应用,通过组合不同的 generator,可以免除重复繁琐的回调函数嵌套,并极大地提升错误处理的效率。koa 不在内核方法中绑定任何中间件,它仅仅提供了一个轻量优雅的函数库,使得编写 Web 应用变得得心应手。

koa是一个精简的node框架,它主要做了以下事情:

  • 基于node原生req和res为request和response对象赋能,并基于它们封装成一个context对象。

  • 基于async/await(generator)的中间件洋葱模型机制。

特点

  • 轻量、无捆绑

  • 中间件架构

  • 通过不同的 generator 以及 await/async 替代了回调

  • 增强的错误处理

  • 简单易用的api

koa1 vs koa2

koa1和koa2在源码上的区别主要是于对异步中间件的支持方式的不同。

  • koa1是使用generator、yield的模式。

  • koa2使用的是async/await+Promise的模式。

本文主要是针对koa2版本源码。

Express vs Koa

下面这张图可以直观的看到Express和koa在功能上的区别,此图来自于官方文档:

1、koa更类似于React或者Vue,许多功能是通过另外的中间件插件实现的,比如路由、模板等,Express更类似于Angular,提供了一体化的解决方案。

2、koa显式推荐async/await语法,Express其实也可以用,不过没那么积极。

3、koa是纯正的洋葱模型,Express只能针对请求进行拦截。

初读koa源码

如果你看了koa的源码,会发现koa源码其实很简单,共4个文件。

── lib
   ├── application.js
   ├── context.js
   ├── request.js
   └── response.js

这4个文件其实也对应了koa的4个对象:

── lib
   ├── new Koa()  || ctx.app
   ├── ctx
   ├── ctx.req  || ctx.request
   └── ctx.res  || ctx.response

下面,我们先初步了解koa的源码内容。

application.js

为入口文件,它继承了 Emitter 模块,Emitter 模块是 NodeJS 原生的模块,简单来说,Emitter 模块能实现事件监听和事件触发能力

/**
 * 依赖模块,包括但不止于下面的,只列出核心需要关注的内容
 */
const response = require('./response');
const compose = require('koa-compose');
const context = require('./context');
const request = require('./request');
const Emitter = require('events');
const convert = require('koa-convert');

/**
 * 继承Emitter,很重要,说明Application有异步事件的处理能力
 */
module.exports = class Application extends Emitter {
  constructor() {
    super();

    this.middleware = [];  // 该数组存放所有通过use函数的引入的中间件函数
    this.subdomainOffset = 2;  // 需要忽略的域名个数
    this.env = process.env.NODE_ENV || 'development';

    // 通过context.js、request.js、response.js创建对应的context、request、response。为什么用Object.create下面会讲解
    this.context = Object.create(context);
    this.request = Object.create(request);
    this.response = Object.create(response);
  
  }

  // 创建服务器
  listen(...args) {
    debug('listen');
    const server = http.createServer(this.callback());  //this.callback()是需要重点关注的部分,其实对应了http.createServer的参数(req, res)=> {}
    return server.listen(...args);
  }

   /*
      通过调用koa应用实例的use函数,形如:
      app.use(async (ctx, next) => {
          await next();
      });
      来加入中间件
    */
  use(fn) {
    if (isGeneratorFunction(fn)) {
       fn = convert(fn);  // 兼容koa1的generator写法,下文会讲解转换原理
    }
    this.middleware.push(fn); // 将传入的函数存放到middleware数组中
    return this;
  }

  // 返回一个类似(req, res) => {}的函数,该函数会作为参数传递给上文的listen函数中的http.createServer函数,作为请求处理的函数
  callback() {
    // 将所有传入use的函数通过koa-compose组合一下
    const fn = compose(this.middleware);

    const handleRequest = (req, res) => {
      // 基于req、res封装出更强大的ctx,下文会详细讲解
      const ctx = this.createContext(req, res);
      // 调用app实例上的handleRequest,注意区分本函数handleRequest
      return this.handleRequest(ctx, fn);
    };

    return handleRequest;
  }

  // 处理请求
  handleRequest(ctx, fnMiddleware) {
     // 省略,见下文
  }

  // 基于req、res封装出更强大的ctx
  createContext(req, res) {
    // 省略,见下文
  }
};

Application 在其原型上提供了 listen、toJSON、inspect、use、callback、handleRequest、createContext、onerror 等八个方法,其中

  • listen:提供 HTTP 服务

  • use:中间件挂载

  • callback:获取 http server 所需要的 callback 函数

  • handleRequest:处理请求体

  • createContext:构造 ctx,合并 node 的 req、res,构造 Koa 的 参数——ctx

  • onerror:错误处理

context.js

主要是做属性和方法的代理,让用户能够更简便的访问到request和response的属性和方法

const util = require('util');
const createError = require('http-errors');
const httpAssert = require('http-assert');
const delegate = require('delegates');

const proto = module.exports = {
  // 省略了一些不甚重要的函数
  onerror(err) {
    // 触发application实例的error事件
    this.app.emit('error', err, this);
  },
};

/*
 在application.createContext函数中,
 被创建的context对象会挂载基于request.js实现的request对象和基于response.js实现的response对象。
 下面2个delegate的作用是让context对象代理request和response的部分属性和方法
*/
delegate(proto, 'response')
  .method('attachment')
  ...
  .access('status')
  ...
  .getter('writable')
  ...;

delegate(proto, 'request')
  .method('acceptsLanguages')
  ...
  .access('querystring')
  ...
  .getter('origin')
  ...;

context.js 中有大量的 delegate 操作,是通过 delegate,可以让 ctx 能够直接访问其上面 response 和 request 中的属性和方法,即可以通过 ctx.xxx 获取到 ctx.request.xxxctx.response.xxx

delegate 是通过 delegates 这个库实现的,通过 proto.__defineGetter__ 和 proto.__defineSetter__ 去代理对象下面节点的属性和方法等。(proto.__defineGetter__proto.__defineSetter__ 现已被 mdn 废弃,改用 Object.defineProperty())

当我们在使用context对象时:

  • ctx.header 是 ctx.request.header 上代理的

  • ctx.body 是 ctx.response.body 上代理的

综上来看,context其实是一个很轻的概念对象,真正干货内容都仍然是在request和response里。

delegate

delegates是第三方npm包,功能就是把一个对象上的方法,属性委托到另一个对象上。

method方法是委托方法,getter方法用来委托getter,access方法委托getter+setter。

下面是源码片段:

function Delegator(proto, target) {
  if (!(this instanceof Delegator)) return new Delegator(proto, target);
  this.proto = proto;
  this.target = target;
  this.methods = [];
  this.getters = [];
  this.setters = [];
  this.fluents = [];
}

Delegator.prototype.method = function(name){
  var proto = this.proto;
  var target = this.target;
  this.methods.push(name);

  proto[name] = function(){
    return this[target][name].apply(this[target], arguments);
  };

  return this;
};

从上面的代码中可以看到,它其实是在proto上新建一个与Request和Response上的方法名一样的函数,然后执行这个函数的时候,这个函数在去Request和Response上去找对应的方法并执行。

简单来个栗子:

var proto = {};

var Request = {
  test: function () {
    console.log('test');
  }
};

var name = 'test';
proto[name] = function () {
  return Request[name].apply(Request, arguments);
};

我们在来看看getter方法

Delegator.prototype.getter = function(name){
  var proto = this.proto;
  var target = this.target;
  this.getters.push(name);

  proto.__defineGetter__(name, function(){
    return this[target][name];
  });

  return this;
};

可以看到,在proto上绑定个getter函数,当函数被触发的时候去,会去对应的request或response中去读取对应的属性,这样request或response的getter同样会被触发~

我们再来看看access:

Delegator.prototype.access = function(name){
  return this.getter(name).setter(name);
};

可以看到,这个方法是getter+setter,getter上面刚说过,setter与getter同理,不多说了。

request.js

对原生的 req 属性做处理,扩展更多可用的属性和方法,比如:query 属性、get 方法:

const IP = Symbol('context#ip')
module.exports = {
	get header() {
		return this.req.headers
	},
	set header(val) {
		this.req.headers = val
	},
	get url() {
		return this.req.url
	},
	set url(val) {
		this.req.url = val
	},
	get method() {
		return this.req.method
	},
	set method(val) {
		this.req.method = val
	},
	get query() {
		const str = this.querystring;
		const c = this._querycache = this._querycache || {};
		return c[str] || (c[str] = qs.parse(str))
	},
	set query(obj) {
		this.querystring = q.stringify(obj)
	},
	get querystring() {
		if (!this.req) return ''
		return parse(this.req).query || ''
	},
	set querystring() {
		const url = parse(this.req)
		if (url.search === `?${str}`) return;

		url.search = str
		url.patch = null

		this.url = stringify(url)
	},
	get socket() {
		return this.socket
	},
	get length() {
		const len = this.get('Content-Length')
		if (len === '') return
		return ~~len
	},
	/**
   * Return request's remote address
   * When `app.proxy` is `true`, parse
   * the "X-Forwarded-For" ip address list and return the first one
   *
   * @return {String}
   * @api public
   */

  get ip() {
    if (!this[IP]) {
      this[IP] = this.ips[0] || this.socket.remoteAddress || '';
    }
    return this[IP];
  },

  set ip(_ip) {
    this[IP] = _ip;
  },
	get(field) {
    const req = this.req;
    switch (field = field.toLowerCase()) {
      case 'referer':
      case 'referrer':
        return req.headers.referrer || req.headers.referer || '';
      default:
        return req.headers[field] || '';
    }
  },
	/**
   * Return JSON representation.
   *
   * @return {Object}
   * @api public
   */

  toJSON() {
    return only(this, [
      'method',
      'url',
      'header'
    ]);
  }
}

request对象基于node原生req封装了一系列便利属性和方法,供处理请求时调用。

所以当你访问ctx.request.xxx的时候,实际上是在访问request对象上的赋值器(setter)和取值器(getter)。

response.js

对原生的 res 属性做处理,扩展更多可用的属性和方法,比如:status 属性、set 方法:

module.exports = {
  // 在application.js的createContext函数中,会把node原生的res作为response对象(即response.js封装的对象)的属性
  // response对象与request对象类似,基于res封装了一系列便利的属性和方法
  get body() {
    return this._body;
  },

  set body(val) {
    // 支持string
    if ('string' == typeof val) {
    }

    // 支持buffer
    if (Buffer.isBuffer(val)) {
    }

    // 支持stream
    if ('function' == typeof val.pipe) {
    }

    // 支持json
    this.remove('Content-Length');
    this.type = 'json';
  },
 }

response对象与request对象类似,就不再赘述。

值得注意的是,返回的body支持Buffer、Stream、String以及最常见的json。

深入理解koa源码

通过上面的阅读,相信对koa有了一个初步认识,下文会从初始化、启动应用、处理请求等的角度,来对这过程中比较重要的细节进行讲解及延伸。

构造函数

首先我们创建了 Koa 的实例 app,其构造函数十分简单,如下:

module.exports = class Application extends Emitter {

  constructor(options) {
    super();
    options = options || {};  //配置
    this.proxy = options.proxy || false;   //是否proxy模式
    this.subdomainOffset = options.subdomainOffset || 2;  //domain要忽略的偏移量
    this.proxyIpHeader = options.proxyIpHeader || 'X-Forwarded-For'; //proxy自定义头部
    this.maxIpsCount = options.maxIpsCount || 0;  //代理服务器数量
    this.env = options.env || process.env.NODE_ENV || 'development';  //环境变量
    if (options.keys) this.keys = options.keys;   // 自定义cookie 密钥
    this.middleware = [];  //中间件数组
    this.context = Object.create(context);
    this.request = Object.create(request);
    this.response = Object.create(response);
    
    if (util.inspect.custom) {   //自定义检查,这里的作用是get app时,去执行this.inspect 。感兴趣可见http://nodejs.cn/api/util.html#util_util_inspect_custom
      this[util.inspect.custom] = this.inspect;
    }
  }
...

1、Application继承自Node.js原生的EventEmitter类,这个类其实就是一个发布订阅模式,可以订阅和发布消息。所以他有些方法如果在application.js里面找不到,那可能就是继承自EventEmitter,比如下图这行代码:

这里有this.on这个方法,看起来他应该是Application的一个实例方法,但是这个文件里面没有,其实他就是继承自EventEmitter,是用来给error这个事件添加回调函数的。这行代码if里面的this.listenerCount也是EventEmitter的一个实例方法。

2、再是一个中间件数组,我们用use使用的中间件都会放进这个数组中

3、最后分别用Object.create拷贝的context、request、response,分别对应koa目录的三个文件。这里用Object.create是因为我们在同一个应用中可能会有多个new Koa的app,为了防止这些app相互污染,用拷贝的方法让其引用不指向同一个地址。

Object.create

Object.create:创建一个新对象,并且将原对象的属性和方法作为新的对象的proto。

this.context = Object.create(context); 
this.request = Object.create(request);
this.response = Object.create(response);

以context为例,这里其实是创建一个新对象,使用context对象来提供新创建对象的proto,并且将这个对象赋值给this.context,实现了类继承的作用。

中间件

在实例化koa之后,接下来,使用app.use传入中间件函数。

app.use(async (ctx,next) => {
    await next();
});

app.use的作用就是添加一个中间件,我们在构造函数里面也初始化了一个变量middleware,用来存储中间件,所以app.use的代码就很简单了,将接收到的中间件塞到这个数组就行:

 use(fn) {
    if (isGeneratorFunction(fn)) {
      fn = convert(fn);
    }
    this.middleware.push(fn);
    return this;
  }

当我们执行app.use的时候,koa做了这2件事情:

  • 判断是否是generator函数,如果是,使用koa-convert做转换。

  • 所有传入use的方法,会被push到middleware中。

注意app.use方法最后返回了this,类的实例方法返回this可以实现链式调用:

app.use(middlewaer1).use(middlewaer2).use(middlewaer3)

为什么会有这种效果呢?因为这里的this其实就是当前实例,也就是app,所以app.use()的返回值就是app,app上有个实例方法use,所以可以继续点app.use().use()。

这里做下延伸讲解,如何将generator函数转为类async函数。

convert源码挺多,核心代码其实是这样的。

function convert(){
 return function (ctx, next) {
    return co.call(ctx, mw.call(ctx, createGenerator(next)))
  }
  function * createGenerator (next) {
    return yield next()
  }
}

最后还是通过co来转换的。所以接下来看co的源码。

co源码

koa2处于对koa1版本的兼容,中间件函数如果是generator函数的话,会使用koa-convert进行转换为“类async函数”。

回忆一下generator的知识:每次执行generator的next函数时,它会返回一个对象:

{ value: xxx, done: false }

返回这个对象后,如果能再次执行next,就可以达到自动执行的目的了。

看下面的例子:

function * gen(){
    yield new Promise((resolve,reject){
        //异步函数1
        if(成功){
            resolve()
        }else{
            reject();
        }
    });
    
    yield new Promise((resolve,reject){
        //异步函数2
        if(成功){
            resolve()
        }else{
            reject();
        }
    })
}
let g = gen();
let ret = g.next();

此时ret = { value: Promise实例; done: false};value已经拿到了Promise对象,那就可以自己定义成功/失败的回调函数了。如:

ret.value.then(()=>{
        g.next();
    })

现在就大功告成啦。我们只要找到一个合适的方法让g.next()一直持续下去就可以自动执行了。

所以问题的关键在于yield的value必须是一个Promise。那么我们来看看co是如何把这些都东西都转化为Promise的:

function co(gen) {
  var ctx = this;  // 把上下文转换为当前调用co的对象
  var args = slice.call(arguments, 1)  // 获取参数

  // we wrap everything in a promise to avoid promise chaining,
  // 不管你的gen是什么,都先用Promise包裹起来
  return new Promise(function(resolve, reject) {
    // 如果gen是函数,则修改gen的this为co中的this对象并执行gen
    if (typeof gen === 'function') gen = gen.apply(ctx, args);

   // 因为执行了gen,所以gen现在是一个有next和value的对象,如果gen不存在、或者不是函数则直接返回gen
    if (!gen || typeof gen.next !== 'function') return resolve(gen);

    // 执行类似上面示例g.next()的代码
    onFulfilled();

  
    function onFulfilled(res) {
      var ret;
      try {
        ret = gen.next(res);  // 执行每一个gen.next()
      } catch (e) {
        return reject(e);
      }
      next(ret);  //把执行得到的返回值传入到next函数中,next函数是自动执行的关键
    }


    function onRejected(err) {
      var ret;
      try {
        ret = gen.throw(err);
      } catch (e) {
        return reject(e);
      }
      next(ret);
    }

    /**
     * Get the next value in the generator,
     * return a promise.
     */
    function next(ret) {
      // 如果ret.done=true说明迭代已经完毕,返回最后一次迭代的value
      if (ret.done) return resolve(ret.value);

      // 无论ret.value是什么,都转换为Promise,并且把上下文指向ctx
      var value = toPromise.call(ctx, ret.value);

      // 如果value是一个Promise,则继续在then中调用onFulfilled。相当于从头开始!!
      if (value && isPromise(value)) return value.then(onFulfilled, onRejected);

      return onRejected(new TypeError('You may only yield a function, promise, generator, array, or object, '
        + 'but the following object was passed: "' + String(ret.value) + '"'));
    }
  });
}

从上面代码可以得到这样的结论,co的思想其实就是:

把一个generator封装在一个Promise对象中,然后再这个Promise对象中再次把它的gen.next()也封装出Promise对象,相当于这个子Promise对象完成的时候也重复调用gen.next()。当所有迭代完成时,把父Promise对象resolve掉。这就成了一个类async函数了。

启动应用

当执行完app.use时,服务还没启动,只有当执行到app.listen(3000)时,程序才真正启动。

app.listen的作用是用来启动服务器,要启动服务器需要调用原生的http.createServer,所以这个方法就是用来调用http.createServer的。

listen(...args) {
    const server = http.createServer(this.callback());
    return server.listen(...args);
  }

这个方法本身其实没有太多可说的,只是调用http模块启动服务而已,主要的逻辑都在this.callback()里面了。

app.callback

this.callback() 执行的结果肯定是一个函数,根据我们基础知识,这个函数无非就是根据 req 获取信息,同时向 res 中写入数据而已。

那么这个函数具体是怎么做的呢?首先,它基于 req 和 res 封装出我们中间件所使用的 ctx 对象,再将 ctx 传递给中间件所组合成的一个嵌套函数。中间件组合的嵌套函数返回的是一个 Promise 的实例,等到这个组合函数执行完( resolve ),通过 ctx 中的信息(例如 ctx.body )想 res 中写入数据,执行过程中出错 (reject),这调用默认的错误处理函数。

原理还是很简单,看一下代码:

callback() {
    // compose处理所有中间件函数。洋葱模型实现核心
    const fn = compose(this.middleware);

    // 每次请求执行函数(req, res) => {}
    const handleRequest = (req, res) => {
      // 基于req和res封装ctx
      const ctx = this.createContext(req, res);
      // 调用handleRequest处理请求
      return this.handleRequest(ctx, fn);
    };

    return handleRequest;
  }

 handleRequest(ctx, fnMiddleware) {
    const res = ctx.res;
    res.statusCode = 404;

    // 调用context.js的onerror函数
    const onerror = err => ctx.onerror(err);

    // 处理响应内容
    const handleResponse = () => respond(ctx);

    // 确保一个流在关闭、完成和报错时都会执行响应的回调函数
    onFinished(res, onerror);

    // 中间件执行、统一错误处理机制的关键
    return fnMiddleware(ctx).then(handleResponse).catch(onerror);
  }

这个方法先用koa-compose将中间件都合成了一个函数fn,然后在http.createServer的回调里面使用req和res创建了一个Koa常用的上下文ctx,然后再调用this.handleRequest来真正处理网络请求。注意这里的this.handleRequest是个实例方法,和当前方法里面的局部变量handleRequest并不是一个东西。这几个方法我们一个一个来看下。

洋葱模型

在上图中,洋葱内的每一层都表示一个独立的中间件,用于实现不同的功能,比如异常处理、缓存处理等。每次请求都会从左侧开始一层层地经过每层的中间件,当进入到最里层的中间件之后,就会从最里层的中间件开始逐层返回。因此对于每层的中间件来说,在一个 请求和响应 周期中,都有两个时机点来添加不同的处理逻辑。

koa的中间件是真正的洋葱模型,能针对request和response拦截,而Express只能针对request,严格的说Express里的只能叫做请求拦截器(但不是说在中间件执行之后不能再执行任何代码了)。

举个例子

我们先举个例子说明一下什么是洋葱模型:

function wait(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms || 1));
}

const arr = [];
const stack = [];

// type Middleware<T> = (context: T, next: Koa.Next) => any;
stack.push(async (context, next) => {
  arr.push(1);
  await wait(1);
  await next();
  await wait(1);
  arr.push(6);
});

stack.push(async (context, next) => {
  arr.push(2);
  await wait(1);
  await next();
  await wait(1);
  arr.push(5);
});

stack.push(async (context, next) => {
  arr.push(3);
  await wait(1);
  await next();
  await wait(1);
  arr.push(4);
});

await compose(stack)({});

对于以上的代码,我们希望执行完 compose(stack)({}) 语句之后,数组 arr 的值为 [1, 2, 3, 4, 5, 6]。这里我们先不关心 compose 函数是如何实现的。我们来分析一下,如果要求数组 arr 输出期望的结果,上述 3 个中间件的执行流程:

1.开始执行第 1 个中间件,往 arr 数组压入 1,此时 arr 数组的值为 [1],接下去等待 1 毫秒。为了保证 arr 数组的第 1 项为 2,我们需要在调用 next 函数之后,开始执行第 2 个中间件。

2.开始执行第 2 个中间件,往 arr 数组压入 2,此时 arr 数组的值为 [1, 2],继续等待 1 毫秒。为了保证 arr 数组的第 2 项为 3,我们也需要在调用 next 函数之后,开始执行第 3 个中间件。

3.开始执行第 3 个中间件,往 arr 数组压入 3,此时 arr 数组的值为 [1, 2, 3],继续等待 1 毫秒。为了保证 arr 数组的第 3 项为 4,我们要求在调用第 3 个中间的 next 函数之后,要能够继续往下执行。

4.当第 3 个中间件执行完成后,此时 arr 数组的值为 [1, 2, 3, 4]。因此为了保证 arr 数组的第 4 项为 5,我们就需要在第 3 个中间件执行完成后,返回第 2 个中间件 next 函数之后语句开始执行。

5.当第 2 个中间件执行完成后,此时 arr 数组的值为 [1, 2, 3, 4, 5]。同样,为了保证 arr 数组的第 5 项为 6,我们就需要在第 2 个中间件执行完成后,返回第 1 个中间件 next 函数之后语句开始执行。

6.当第 1 个中间件执行完成后,此时 arr 数组的值为 [1, 2, 3, 4, 5, 6]。

为了更直观地理解上述的执行流程,我们可以把每个中间件当做 1 个大任务,然后在以 next 函数为分界点,在把每个大任务拆解为 3 个 beforeNext、next 和 afterNext 3 个小任务。

在上图中,我们从中间件一的 beforeNext 任务开始执行,然后按照紫色箭头的执行步骤完成中间件的任务调度。

koa-compose源码

koa-compose虽然被作为了一个单独的库,但是他的作用却很关键,所以我们也来看看他的源码吧。koa-compose的作用是将一个中间件组成的数组合并成一个方法以便外部调用。我们先来回顾下一个Koa中间件的结构:

function middleware(ctx, next) {}

这个数组就是有很多这样的中间件:

[
  function middleware1(ctx, next) {},
  function middleware2(ctx, next) {}
]

Koa的合并思路并不复杂,就是让compose再返回一个函数,返回的这个函数会开始这个数组的遍历工作:

function compose(middleware) {
  // 参数检查,middleware必须是一个数组
  if (!Array.isArray(middleware))
    throw new TypeError("Middleware stack must be an array!");
  // 数组里面的每一项都必须是一个方法
  for (const fn of middleware) {
    if (typeof fn !== "function")
      throw new TypeError("Middleware must be composed of functions!");
  }

  // 返回一个方法,这个方法就是compose的结果
  // 外部可以通过调用这个方法来开起中间件数组的遍历
  // 参数形式和普通中间件一样,都是context和next
  return function (context, next) {
    let index = -1
    return dispatch(0); // 开始中间件执行,从数组第一个开始

    // 执行中间件的方法
    function dispatch(i) {
      // 一个函数中多次调用报错
      // await next()
      // await next()
      if (i <= index) return Promise.reject(new Error('next() called multiple times'))
      index = i

      let fn = middleware[i]; // 取出需要执行的中间件

      // 如果i等于数组长度,说明数组已经执行完了
      if (i === middleware.length) {
        fn = next; // 这里让fn等于外部传进来的next,其实是进行收尾工作,比如返回404
      }

      // 如果外部没有传收尾的next,直接就resolve
      if (!fn) {
        return Promise.resolve();
      }

      // 执行中间件,注意传给中间件接收的参数应该是context和next
      // 传给中间件的next是dispatch.bind(null, i + 1)
      // 所以中间件里面调用next的时候其实调用的是dispatch(i + 1),也就是执行下一个中间件
      try {
        return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
      } catch (err) {
        return Promise.reject(err);
      }
    }
  };
}

上面代码主要的逻辑就是这行:

return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));

这里的fn就是我们自己写的中间件,我们稍微改下看得更清楚:

const logger = async (ctx, next) => {
  const start = Date.now();
  await next();
  const ms = Date.now() - start;
  console.log(`${ctx.method} ${ctx.url} - ${ms}ms`);
};

app.use(logger);

那我们compose里面执行的其实是:

logger(context, dispatch.bind(null, i + 1));

也就是说logger接收到的next其实是dispatch.bind(null, i + 1),你调用next()的时候,其实调用的是dispatch(i + 1),这样就达到了执行数组下一个中间件的效果。

另外由于中间件在返回前还包裹了一层Promise.resolve,所以我们所有自己写的中间件,无论你是否用了Promise,next调用后返回的都是一个Promise,所以你可以使用await next()。

next 函数不能调用多次

这一块对应的则是:

index = -1
dispatch(0)
function dispatch (i) {
  if (i <= index) return Promise.reject(new Error('next() called multiple times'))
  index = i
}

调用多次后 i <= index,所以会报错。

如果中间件中的next()方法报错了怎么办

ctx.onerror = function {
  this.app.emit('error', err, this);
};

  listen(){
    const  fnMiddleware = compose(this.middleware);
    if (!this.listenerCount('error')) this.on('error', this.onerror);
    const onerror = err => ctx.onerror(err);
    fnMiddleware(ctx).then(handleResponse).catch(onerror);
  }

  onerror(err) {
    // 代码省略
    // ...
  }

答:中间件链错误会由ctx.onerror捕获,该函数中会调用this.app.emit(‘error’, err, this)(因为koa继承自events模块,所以有’emit’和on等方法),可以使用app.on(‘error’, (err) => {}),或者app.onerror = (err) => {}进行捕获。

app.createContext

上面用到的this.createContext也是一个实例方法。这个方法根据http.createServer传入的req和res来构建ctx这个上下文,官方源码长这样:

  createContext(req, res) {
    const context = Object.create(this.context); // 创建一个对象,使之拥有context的原型方法,后面以此类推
    const request = context.request = Object.create(this.request);
    const response = context.response = Object.create(this.response);
    context.app = request.app = response.app = this;
    context.req = request.req = response.req = req;
    context.res = request.res = response.res = res;
    request.ctx = response.ctx = context;
    request.response = response;
    response.request = request;
    context.originalUrl = request.originalUrl = req.url;
    context.state = {};
    return context;
  }

一大串的赋值骚操作,我们细细解读一下:

1、先通过 Object.create(),创建了新的从 context.js、request.js、response.js 引入的对象,防止引入的原始对象被污染。

2、通过 context.request = Object.create(this.request) 和 context.response = Object.create(this.response) 将 request 和 response 对象挂载到了 context 对象上。这部分对应了 context.js 中delegate 的委托部分,能让 ctx 直接通过 ctx.xxx 去访问到 ctx.request.xxxctx.response.xxx

3、通过一系列的赋值操作,将原始的 http 请求的 res 和 req,以及 Koa 实例app 等等分别挂载到了 context、request 和 response 对象中,以便于在 context.js、request.js 和response.js 中针对原始的请求、相应参数等做一些系列的处理访问,便于用户使用

最终这段代码执行后的关系图如下:

这里需要说明下

  • request - request继承于Request静态类,包含操作request的一些常用方法

  • response - response继承于Response静态类,包含操作response的一些常用方法

  • req - nodejs原生的request对象

  • res - nodejs原生的response对象

  • app - koa的原型对象

又由于这段代码的执行时间是接受请求的时候,所以表明每一次接受到请求,都会为该请求生成一个新的上下文。换个角度思考,每次回调过来都是创建新的context、request和response实例,这样本质上也对应了HTTP是无状态的

app.handleRequest

callback 中执行完 createContext 后,会将创建好的 ctx 以及合并中间件后生成的顺序执行函数传给 handleRequest 并执行该函数。

handleRequest 中会通过 onFinished 这个方法监听 res,当 res 完成、关闭或者出错时,便会执行 onerror 回调。之后返回中间件执行的结果,当中间件全部执行完之后,执行 respond 进行数据返回操作。

  handleRequest(ctx, fnMiddleware) {
    const res = ctx.res;
    res.statusCode = 404;
    const onerror = err => ctx.onerror(err);
    const handleResponse = () => respond(ctx);

    onFinished(res, onerror);
    return fnMiddleware(ctx).then(handleResponse).catch(onerror);
  }

这里简单判断了下statusCode是否404,onFinished处理res为stream时情况。 最后调用fnMiddleware函数,传入ctx,resove则进入respond,reject则进入ctx.onerror,最终返回结果。

respond

我们在写koa的时候,会发现所有的response操作都是 this.body = xxx; this.status = xxxx;这样的语法,但如果对原生nodejs有了解的童鞋知道,nodejs的response只有一个api那就是res.end();,而设置status状态码什么的都有不同的api,那么koa是如何做到通过this.xxx = xxx来设置response的呢?

从图中看到,request请求是以respond结束的。

是滴,所有的request请求都是以respond这个函数结束的,这个函数会读取this.body中的值根据不同的类型来决定以什么类型响应请求

respond是一个辅助方法,并不在Application类里面,他要做的就是将网络请求返回:

function respond(ctx) {
	// allow bypassing koa
	if (false === ctx.respond) return

	if (!ctx.writable) return

	const res = ctx.res
	let body = ctx.body
	const code = ctx.status;

	// statuses.empty(code) returns true if a status code expects an empty body
	if (statuses.empty[code]) {
		// strip headers
		ctx.body = null
		return res.end()
	}

	if ('HEAD' === ctx.method) {
		if (!res.headersSent && !ctx.response.has('Content-Length')) {
			const { length } = ctx
			if (Number.isInteger(length)) ctx.length = length
		}
		return res.end()
	}

	// status body
	if (null === body) {
		if (ctx.reponse._expliciNullBody) {
			ctx.response.remove('Content-Type')
			ctx.response.remove('Transfer-Encoding')
			return res.end()
		}

		if (ctx.req.httpVersionMajaor >= 2) {
			body = String(code)
		} else {
			body = ctx.message || String(code)
		}
		if (!res.headersSend) {
			ctx.type = 'text'
			ctx.length = Buffer.byteLength(body)
		}
		return res.end(body)
	}

	// responses
	if (Buffer.isBuffer(body)) return res.end(body)
	if ('string' === typeof body) return res.end(body)
	if (body instanceof Stream) return body.pipe(res)

	// body: json
	body = JSON.stringify(body)
	if (!res.headersSent) {
		ctx.length = Buffer.byteLength(body)
	}
	res.end(body)
}

respond这个方法里内容稍微有点长,它做的是ctx返回不同情况的处理,如method为head时加上content-length字段、body为空时去除content-length等字段,返回相应状态码、body为Stream时使用pipe等。

仔细阅读的童鞋会发现,咦,,,,为毛没有设置status和header等信息的代码逻辑?这不科学啊。我分明记得状态码是rs.statusCode = 400这样设置的,为啥代码中没有??

这就要从最开始的上下文说起了。为什么Response静态类中添加req和res属性?就是因为添加了req和res之后,response和request类就可以直接操作req和res啦。。我们看一段源码就明白了

set status(code) {
  assert('number' == typeof code, 'status code must be a number');
  assert(statuses[code], 'invalid status code: ' + code);
  this._explicitStatus = true;
  this.res.statusCode = code;
  this.res.statusMessage = statuses[code];
  if (this.body && statuses.empty[code]) this.body = null;
},

主要是this.res.statusCode = code; this.res.statusMessage = statuses[code];这两句,statusCode和statusMessage都是nodejs原生api。

错误处理

koa框架提供了一种集中式的错误处理机制,只需让koa实例监听error事件,则所有中间件代码逻辑的错误都可以在该回调函数中统一处理,如下

app.on('error', err => {
  log.error('server error', err)
});

其实这里可以说只是把error打印了出来。但是对于中间件内的异步错误,koa是无法捕捉的(除非转同步)。我们的应用如果需要记录这个错误可以用node的process监听

process.on("unhandledRejection", (err) => {
  console.log(err);
});

这是如何做到的呢? 核心代码如下: 第一部分:application.js:

handleRequest(ctx, fnMiddleware) {
    const res = ctx.res;
    res.statusCode = 404;
    // application.js也有onerror函数,但这里使用了context的onerror,
    const onerror = err => ctx.onerror(err);
    const handleResponse = () => respond(ctx);
    onFinished(res, onerror);

    // 这里是中间件如果执行出错的话,都能执行到onerror的关键!!!
    return fnMiddleware(ctx).then(handleResponse).catch(onerror);
  }

看完会有两个疑问:

1,出错执行的回调函数是context.js中的onerror函数,为什么在application实例上监听error事件,就能处理所有中间件中的错误呢? 请看context.js中onerror函数的精简版: context.js:

onerror(err) {
    this.app.emit('error', err, this);
}

其中this.app即是对application的引用,当context.js的onerror触发时,会触发application实例的error事件。该事件机制是基于“Application类继承自EventEmitter”这一事实。

2,如何做到集中处理所有中间件的错误?中间件洋葱式调用的实现逻辑如下:

function compose (middleware) {
  return function (context, next) {
    let index = -1
    return dispatch(0)

    function dispatch (i) {
      if (i <= index) return Promise.reject(new Error('next() called multiple times'))
      index = i
      let fn = middleware[i]
      if (i === middleware.length) fn = next
      if (!fn) return Promise.resolve()
      try {
        return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
      } catch (err) {
        return Promise.reject(err)
      }
    }
  }
}

还有外部处理:

// 这里是中间件如果执行出错的话,都能执行到onerror的关键!!!
return fnMiddleware(ctx).then(handleResponse).catch(onerror);

主要涉及这几个知识点:

  • async函数返回一个Promise对象

  • async函数内部抛出错误,会导致Promise对象变为reject状态。抛出的错误会被catch的回调函数(上面为onerror)捕获到。

  • await命令后面的Promise对象如果变为reject状态,reject的参数也可以被catch的回调函数(上面为onerror)捕获到。

这样就可以理解为什么koa能实现异步函数的统一错误处理了。

总结起来,我们可以在不同的抽象层次上处理错误。比如,我们可以在顶层的中间件将所有中间件产生的错误捕获并处理了,这样错误就不会被上层捕获。我们也可以覆盖 ctx.onerror 的方式来捕获所有的异常,而且可以不触发 app 的 error 事件。最后我们当然也可以直接监听 app 的 error 事件的方式来处理错误。