Node.js源码解析--net.js模块

21 February 2017

net.js socket

net.js模块是Node.js异步网络中最重要的一部分。 相信大多数读者都使用过net.js模块。

前言

那net.js模块它底层到底做了什么,让我们有异步网络的能力。反过来问就是说:net.js底层通过什么来实现异步网络。

这篇文章并不会告诉你如何的使用net.js模块。我希望通过这篇文章告诉你,net.js模块是如何实现这个异步网络。

1. 背景知识

我们知道Node.js的大部分能力都是由它的c/c++模块提供的,net.js模块也不例外。

那么net.js关键的知识就是:

  1. TCP传输控制协议
  2. Socket套接字编程

1.1 TCP

提供的是面向连接、可靠的字节流服务。当客户和服务器彼此交换数据前,必须先在双方之间建立一个TCP连接,之后才能传输数据。 TCP提供超时重发,丢弃重复数据,检验数据,流量控制等功能,保证数据能从一端传另一端.

1.2 Socket

像TCP/IP、TCP和UDP等协议只有一套,而我们系统多个TCP连接或多个应用程序进程必须通过同一个TCP协议端口传输数据。 为了区别不同的应用程序进程和连接,许多计算机操作系统为应用程序与TCP/IP协议交互提供了称为套接字(Socket)的接口。

套接字就是支持TCP/IP网络通信的基本操作单元,是我们进行TCP/IP进行通信的接口。

2.从创建一个TCP服务器说起

我们可以很轻松的创建一个TCP服务器和客户端。

创建一个 TCP 服务器:

//server.js
const net = require('net');
var server = net.createServer((socket)=>{
  socket.on('data', (data) => {
    console.log('client send message: ' , data.toString());
  });
  socket.write('hello client!');
});
server.listen(8888,'127.0.0.1' , ()=>{
  console.log(server.address());
});

应答服务器的客户端:

//client.js
const net = require('net');
var client = net.connect({port: 8888, host: '127.0.0.1'}, function() {
    client.write('hello server!\r\n');
});
client.on('data', (data) => {
  console.log('server send message: ' ,data.toString());
  client.end();
});
client.on('end', () => {
  console.log('disconnected from server');
});

使用cmd分别启动TCP服务器和客户端,这样就可以与服务器通讯了。

node server.js
node client.js

一次最简单的通讯:

  1. TCP服务器端依次调用Socket()、bind()、listen()之后,就会监听指定的socket地址了。
  2. TCP客户端依次调用Socket()、connect()之后就向TCP服务器发送了一个连接请求。
  3. TCP服务器监听到这个请求之后,就会调用accept()函数去接收请求,这样连接就建立好了。之后就可以开始网络I/O操作了,即类同于普通文件的读写I/O操作。

万变不离其宗,Node.js的TCP通讯也是在C的基础上搭建起来的。

先来看看C编写的TCP服务是怎么样的吧。

TCP

通过gcc命令分别编译socket.c和socket_client.c文件。然后分别执行编译后的可执行文件socket_server 和 socket_client。这时候TCP服务器和TCP客户端就建立好连接了。

gcc socket.c -o ./socket_server
gcc socket_client.c -o ./socket_client

通过命令行直接启动TCP服务器和客户端即可进行TCP通讯。

socket_server
socket_client ip //ip为你TCP服务器当前的ip地址。如果是本机测试可以使用ifconfig查询ip,本案例默认服务器端口为6666

那既然都知道了TCP的底层是这样实现的,那如何把底层TCP服务提供给Node.js使用呢。

3 TCP服务器端

首先TCP服务器端会创建一个socket;然后bind系统调用把addr中的地址分配给与描述符socket关联的未命名套接字,地址结构的长度由addr_len指定, 通过socket调用创建的套接字必须经过命名(绑定地址)后才能使用;然后调用listen()来监听这个socket。

TCP

从图中可以清晰的看出,服务器端与客户端的区别在于,服务器端监听端口等待连接请求,而客户端则是请求连接。

回到net.js模块是如何实现这个TCP服务器的创建的呢。

首当其冲的就是

3.1 net.Server对象

看上面的TCP服务器创建代码,你可能会以为net.createServer()函数调用之后就会创建一个socket实例对象。 其实不是的,net.createServer()只是用来初始化参数的,包含连接成功后的回调函数。

//net.js
function Server(options, connectionListener) {
  EventEmitter.call(this);
  this.on('connection', connectionListener);
  this._handle = null;
  this._usingSlaves = false;
  this._slaves = [];
  this._unref = false;
  this.allowHalfOpen = options.allowHalfOpen || false;
  this.pauseOnConnect = !!options.pauseOnConnect;
}

Server对象,最主要的参数是_handle,这个参数保存着服务器的socket。 那么何时才创建socket呢。答案在listen函数。

与net.createServer相同

3.2 Server.prototype.listen原型方法

整个socket的创建是从这里开始的,listen函数主要对传入的参数进行判断以及补全。

  //net.js  [Function Server.prototype.listen]
  var [options, cb] = normalizeArgs(args);
  //...
  if (options instanceof TCP) { //tcp
    this._handle = options;
    listen(this, null, -1, -1, backlog);
  } else if (typeof options.fd === 'number' && options.fd >= 0) {//根据fd创建
    listen(this, null, null, null, backlog, options.fd);
  } else {
    backlog = options.backlog || backlog;

    if (typeof options.port === 'number' || typeof options.port === 'string' ||
        (typeof options.port === 'undefined' && 'port' in options)) {
      assertPort(options.port);
      if (options.host) {// eq: 127.0.0.1 : 80
        //检测ip 并监听
        lookupAndListen(this, options.port | 0, options.host, backlog,
                        options.exclusive);
      } else {
        listen(this, null, options.port | 0, 4, backlog, undefined,
               options.exclusive);
      }
      //创建管道pipe
    } else if (options.path && isPipeName(options.path)) {
      // UNIX socket or Windows pipe.
      const pipeName = this._pipeName = options.path;
      listen(this, pipeName, -1, -1, backlog, undefined, options.exclusive);
    } 
  }

首先会通过对传入的参数使用normalizeArgs函数进行序列化。backlog表示等待TCP连接的队列大小。

传参情况:

  • 1.如果传入的直接是一个TCP对象,那么会直接调用c/c++原生模块进行监听。
  • 2.如果传入的是一个文件描述符,那么会先判断文件描述符的类型,然后创建对应的服务器类型。
  • 3.如果没有符合以上的1,2.有可能符合以下情况:
    • 3.1根据ip和端口创建TCP服务器。
    • 3.2根据optins.path参数创建Pipe管道。

3.2.1 normalizeArgs函数

函数返回的options是一个对象,cb就是创建完TCP服务器后的回调函数。

function normalizeArgs(args) {
  var options = {};

  if (args.length === 0) {
    return [options];
  } else if (args[0] !== null && typeof args[0] === 'object') {
    // connect(options, [cb])
    options = args[0];
  } else if (isPipeName(args[0])) {
    // connect(path, [cb]);
    options.path = args[0];
  } else {
    // connect(port, [host], [cb])
    options.port = args[0];
    if (args.length > 1 && typeof args[1] === 'string') {
      options.host = args[1];
    }
  }

  var cb = args[args.length - 1];
  if (typeof cb !== 'function')
    cb = null;
  return [options, cb];
}

3.2.2 lookupAndListen函数

如果我们传入了ip地址(上面创建TCP服务器就传入的ip),使用dns模块查询传入的ip地址的ip族。

require('dns').lookup(address, function doListening(err, ip, addressType) {
    addressType = ip ? addressType : 4;
    listen(self, ip, port, addressType, backlog, undefined, exclusive);//监听,回归正常流程
  }
});

3.3 listen函数

这个普通函数的工作就是判断到底是使用哪条进程来创建这个服务器。

//net.js [function listen]
if (!cluster) cluster = require('cluster');
//主进程 或者 独享一个进程和端口
if (cluster.isMaster || exclusive) {
  self._listen2(address, port, addressType, backlog, fd);
  return;
}
//工作线程
cluster._getServer(self, {
  address: address,
  port: port,
  addressType: addressType,
  fd: fd,
  flags: 0
}, cb);

function cb(err, handle) {
  self._handle = handle;
  self._listen2(address, port, addressType, backlog, fd);
}

成功确认使用哪条进程创建服务后,就要开始创建服务并且监听。

3.4 Server.prototype._listen2 函数

创建服务器net.js模块的工作都头了,现在剩下正在核心的部分应该让c/c++模块来实现了。

//net.js [function Server.prototype._listen2]
if (!address && typeof fd !== 'number') {
  this._handle = createServerHandle('::', port, 6, fd);//创建服务器句柄

if ( this._handle === null)//一般创建TCP服务器的情况
  this._handle = createServerHandle(address, port, addressType, fd);//创建服务器句柄

this._handle.onconnection = onconnection;

_listen(this._handle, backlog);//执行c++模块,打通服务

这个函数里面最重要的就只有3句话。

3.4.1 createServerHandle 函数

这个函数会返回服务器句柄。

/**
 * [createServerHandle 创建服务器句柄]
 * @param  {[type]} address     [ip地址]
 * @param  {[type]} port        [端口号]
 * @param  {[type]} addressType [ip类型]
 * @param  {[type]} fd          [文件描述符]
 * @return {[type]}             [description]
 */
function createServerHandle(address, port, addressType, fd) {
  //文件描述符
  if (typeof fd === 'number' && fd >= 0) {
      handle = createHandle(fd);
  } else if (port === -1 && addressType === -1) {//管道
    handle = new Pipe();
  } else { //TCP服务
    handle = new TCP();
  }
  if (!address) {
    err = handle.bind6('::', port);
    if (err) {
      handle.close();
      // Fallback to ipv4
      return createServerHandle('0.0.0.0', port);
    }
  } else if (addressType === 6) {
    err = handle.bind6(address, port);//ip6
  } else {
    err = handle.bind(address, port);//ip4
  }
  return handle;
}

PipeTCP , TTYWrap这3个对象都是由c/c++模块提供的。我们很清楚的看出createServerHandle函数。 正常情况下已经完成服务器的创建已经端口绑定。

/**
 * [createHandle 创建句柄]
 * @param  {[type]} fd [文件描述符]
 * @return {[type]}    [description]
 */
function createHandle(fd) {
  //判断文件描述符类型
  var type = TTYWrap.guessHandleType(fd);
  if (type === 'PIPE') return new Pipe();
  if (type === 'TCP') return new TCP();
}

c/c++模块中guessHandleType会根据文件描述符fd来判断类型:

//tty_wrap.cc [GuessHandleType]
uv_handle_type t = uv_guess_handle(fd);
const char* type = nullptr;
switch (t) {
case UV_TCP: type = "TCP"; break;
case UV_TTY: type = "TTY"; break;
case UV_UDP: type = "UDP"; break;
case UV_FILE: type = "FILE"; break;
case UV_NAMED_PIPE: type = "PIPE"; break;
case UV_UNKNOWN_HANDLE: type = "UNKNOWN"; break;
default:
  ABORT();
}

3.4.2 _listen函数

_listen 执行c++模块创建底层的服务。

handle.listen(backlog || 511);
//tcp_wrap.cc
int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
                      backlog,
                      OnConnection);
//stream.c
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  int err;
  switch (stream->type) {
  case UV_TCP:
    err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
    break;
  case UV_NAMED_PIPE:
    err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
    break;
  default:
    err = -EINVAL;
  }
  if (err == 0)
    uv__handle_start(stream);
  return err;
}

好咯,到这里服务器服务要做的都做完了,就等着客户端发起请求连接。

3.4.3 onconnection函数

当客户端发起请求连接,服务器接受请求后就会执行这个函数。创建Socket对象,连接客户端。

/**
 * [onconnection 连接成功处理函数]
 * 这个函数是给c++调用的
 * @param  {[type]} err          [description]
 * @param  {[type]} clientHandle [description]
 * @return {[type]}              [description]
 */
function onconnection(err, clientHandle) {
  var handle = this;
  var self = handle.owner;
  //创建socket对象
  var socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect
  });
  //触发connection函数,也就是我们使用net.createServer时传入的函数
  self.emit('connection', socket);
}

这个的net.js部分是没有代码显示的调用this._handle.onconnection函数的。这部分是c/c++模块做的。 答案就在3.4.2的uv_listen函数的OnConnection函数中。

//connection_wrap.cc
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
                                                    int status) {
  WrapType* wrap_data = static_cast<WrapType*>(handle->data);
  if (status == 0) {
    Local<Object> client_obj = WrapType::Instantiate(env, wrap_data);//客户端对象
    uv_stream_t* client_handle =
        reinterpret_cast<uv_stream_t*>(&wrap->handle_);//客户端对象句柄
    if (uv_accept(handle, client_handle))//接受客户端请求
      return;
    argv[1] = client_obj;// 接受请求成功,执行js那边的回调。
  }
  //执行onconnection 函数
  wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);
}

接下来就是socket的事了。

4 TCP 客户端

Socket对象创建TCP客户端。主要包含2个方法和一个事件监听器。

通过net.connect创建Socket实例对象。

  • 1.write方法,往socket的另一端写入数据;
  • 2.end方法,结束socket;
  • 3.on(‘data’,func)事件监听,数据接收。

4.1 write方法

我们先来看看它们的继承关系。

TCP

Socket.prototype.write的原型方法其实是使用的stream.Duplex.prototype.write的原型方法, 而stream.Duplexwrite方法其实又是来自Writable.prototype.write的原型方法。 所以我们最终调用的是Writeable的原型方法。

//net.js 
Socket.prototype.write = function(chunk, encoding, cb) {
  return stream.Duplex.prototype.write.apply(this, arguments);
};

当我们执行socket.write('send to server')时,会执行writeOrBuffer方法。

//stram_writeable.js
Writable.prototype.write = function(chunk, encoding, cb) {
  //...
  if (state.ended)
    writeAfterEnd(this, cb);
  else if (validChunk(this, state, chunk, cb)) {
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }
  return ret;
};

writeOrBuffer方法会进行参数判断和配置。一切正常就会执行doWrite方法。

//stram_writeable.js
function writeOrBuffer(stream, state, chunk, encoding, cb) {
  chunk = decodeChunk(state, chunk, encoding);//解析数据

  if (chunk instanceof Buffer)
    encoding = 'buffer';
  var len = state.objectMode ? 1 : chunk.length;

  if (state.writing || state.corked) {
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {//一切正常
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

doWrite表示,到了这一步,就要开始往socket的另一端开始写数据了,stream对象就是socket对象。

//stram_writeable.js
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  //...
  if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

Socket原型有_writev_write方法,都是Socket.prototype._writeGeneric方法的不同参数调用而已。

//net.js
/**
 * [_writeGeneric 往客户端写数据]
 * @param  {[type]}   writev   [description]
 * @param  {[type]}   data     [数据]
 * @param  {[type]}   encoding [编码]
 * @param  {Function} cb       [回调]
 * @return {[type]}            [description]
 */
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
  //...
  var req = new WriteWrap();//创建写请求
  req.handle = this._handle;
  req.oncomplete = afterWrite;
  req.async = false;
  var err;
  if (writev) {
    var chunks = new Array(data.length << 1);
    for (var i = 0; i < data.length; i++) {
      var entry = data[i];
      chunks[i * 2] = entry.chunk;
      chunks[i * 2 + 1] = entry.encoding;
    }
    err = this._handle.writev(req, chunks);//数据传输
    // Retain chunks
    if (err === 0) req._chunks = chunks;
  } else {
    var enc;
    if (data instanceof Buffer) {
      enc = 'buffer';
    } else {
      enc = encoding;
    }
    err = createWriteReq(req, this._handle, data, enc);//发送数据
  }
  if (req.async && this._handle.writeQueueSize != 0)//异步
    req.cb = cb;
  else
    cb();
};

createWriteReq就是创建写数据请求,通过C/C++核心模块的stream对象进行写数据。 到这里JavaScript模块的工作就完成了,剩下的就交给C/C++完成。

//net.js
/**
 * [createWriteReq 数据传输]
 * @param  {[type]} req      [请求对象]
 * @param  {[type]} handle   [句柄]
 * @param  {[type]} data     [数据]
 * @param  {[type]} encoding [编码]
 * @return {[type]}          [description]
 */
function createWriteReq(req, handle, data, encoding) {
  switch (encoding) {
    case 'latin1':
    case 'binary':
      return handle.writeLatin1String(req, data);
    case 'buffer':
      return handle.writeBuffer(req, data);
    case 'utf8':
    case 'utf-8':
      return handle.writeUtf8String(req, data);
    case 'ascii':
      return handle.writeAsciiString(req, data);
    case 'ucs2':
    case 'ucs-2':
    case 'utf16le':
    case 'utf-16le':
      return handle.writeUcs2String(req, data);
    default:
      return handle.writeBuffer(req, Buffer.from(data, encoding));
  }
}

createWriteReq函数会根据encoding来调用不同的C/C++模块的方法。 handle.writeBuffer等方法都可以在stream_base.cc里面找到。让我们来看看C/C++部分如何实现的。

//stream_wrap.cc
int StreamWrap::DoWrite(WriteWrap* w,uv_buf_t* bufs,size_t count,uv_stream_t* send_handle) {
  int r;
  if (send_handle == nullptr) {
    r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
  } else {
    r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
  }
  return r;
}

好吧!已经进入了调用libuv函数的地步了。追到最后发现了sendmsg函数,没错已经算到头了。

//deps/uv/src/unix/stream.c
  do {
    n = sendmsg(uv__stream_fd(stream), &msg, 0);
  }
#if defined(__APPLE__)
  while (n == -1 && (errno == EINTR || errno == EPROTOTYPE));
#else
  while (n == -1 && errno == EINTR);
#endif

4.1.1 sendmsg()函数

sendmsg()用来将数据由指定的socket传给对方主机.

4.2 end方法

我们先来看看Node.js官方文档是怎么描述这个方法的。

Half-closes the socket. i.e., it sends a FIN packet. It is possible the server will still send some data.

If data is specified, it is equivalent to calling socket.write(data, encoding) followed by socket.end().

关闭socket的一半,及变成半双工模式,发送FIN报文。但服务器依然可以发送数据过来。 使用该方法的时候如果传入了data参数,那么就会先执行socket.write方法把数据发送除去然后再执行socket.end方法。

//net.js
Socket.prototype.end = function(data, encoding) {
  stream.Duplex.prototype.end.call(this, data, encoding);
  this.writable = false;
  if (this.readable && !this._readableState.endEmitted)
    this.read(0);
  else
    maybeDestroy(this);
};

根据上图4.1的继承关系可知,Socket.prototype.end调用的是Writable.prototype.end方法。 end方法发送数据使用的也是write方法。

Writable.prototype.end = function(chunk, encoding, cb) {
  var state = this._writableState;
  if (chunk !== null && chunk !== undefined)
    this.write(chunk, encoding);
  if (state.corked) {
    state.corked = 1;
    this.uncork();
  }
  if (!state.ending && !state.finished)
    endWritable(this, state, cb);
};

endWritable会进行状态判断,然后进行状态的设置以及触发监听事件的回调。比如finish事件。

function endWritable(stream, state, cb) {
  state.ending = true;
  finishMaybe(stream, state);
  //...
  state.ended = true;
  stream.writable = false;
}

maybeDestroy(this)函数最后会调用socket._destroy方法。

Socket.prototype._destroy = function(exception, cb) {
  if (this.destroyed) {
    return;
  }
  this.connecting = false;
  this.readable = this.writable = false;
  for (var s = this; s !== null; s = s._parent)
    timers.unenroll(s);
  if (this._handle) {
    this[BYTES_READ] = this._handle.bytesRead;
    this._handle.close(() => {
      this.emit('close', isException);
    });
    this._handle.onread = noop;
    this._handle = null;
    this._sockname = null;
  }
  this.destroyed = true;
  if (this._server) {
    this._server._connections--;
  }
};

关闭socket最重要的就是状态的还原。this._handle.close()关闭socket连接。

4.3 on(‘data’,func)事件监听器

你可能很好奇,为什么没有讲read方法。其实是有原因的:因为实际中,我们并不知道客户端或者服务器端发送过来 的数据到底有多长。

初始化Socket实例的时候,会添加onread函数。当有数据到达时C/C++核心模块会触发该监听事件:

//net.js [function initSocketHandle]
self._handle.onread = onread;

//[function onread]
function onread(nread, buffer) {
  var handle = this;
  var self = handle.owner;
  if (nread > 0) {
    var ret = self.push(buffer);
    if (handle.reading && !ret) {
      handle.reading = false;
      var err = handle.readStop();
    }
    return;
  }
  if (nread === 0) {
    return;
  }
  self.push(null);

  if (self._readableState.length === 0) {
    self.readable = false;
    maybeDestroy(self);
  }
  self.emit('_socketEnd');
}

self.push(buffer)会触发stream.emit('data',chunk)事件。只有等到stream.emit('readable')触发的时候(也就是:self.push(null)时),才说明数据全部接收完成。

参考文档

TCP/UDP的区别与联系

套接字Socket

listen backlog 的含义