net.js模块是Node.js异步网络中最重要的一部分。 相信大多数读者都使用过net.js模块。
那net.js模块它底层到底做了什么,让我们有异步网络的能力。反过来问就是说:net.js底层通过什么来实现异步网络。
这篇文章并不会告诉你如何的使用net.js模块。我希望通过这篇文章告诉你,net.js模块是如何实现这个异步网络。
我们知道Node.js的大部分能力都是由它的c/c++模块提供的,net.js模块也不例外。
那么net.js关键的知识就是:
提供的是面向连接、可靠的字节流服务。当客户和服务器彼此交换数据前,必须先在双方之间建立一个TCP连接,之后才能传输数据。 TCP提供超时重发,丢弃重复数据,检验数据,流量控制等功能,保证数据能从一端传另一端.
像TCP/IP、TCP和UDP等协议只有一套,而我们系统多个TCP连接或多个应用程序进程必须通过同一个TCP协议端口传输数据。 为了区别不同的应用程序进程和连接,许多计算机操作系统为应用程序与TCP/IP协议交互提供了称为套接字(Socket)的接口。
套接字就是支持TCP/IP网络通信的基本操作单元,是我们进行TCP/IP进行通信的接口。
我们可以很轻松的创建一个TCP服务器和客户端。
创建一个 TCP 服务器:
//server.js
const net = require('net');
var server = net.createServer((socket)=>{
socket.on('data', (data) => {
console.log('client send message: ' , data.toString());
});
socket.write('hello client!');
});
server.listen(8888,'127.0.0.1' , ()=>{
console.log(server.address());
});
应答服务器的客户端:
//client.js
const net = require('net');
var client = net.connect({port: 8888, host: '127.0.0.1'}, function() {
client.write('hello server!\r\n');
});
client.on('data', (data) => {
console.log('server send message: ' ,data.toString());
client.end();
});
client.on('end', () => {
console.log('disconnected from server');
});
使用cmd
分别启动TCP服务器和客户端,这样就可以与服务器通讯了。
node server.js
node client.js
一次最简单的通讯:
万变不离其宗,Node.js的TCP通讯也是在C的基础上搭建起来的。
先来看看C编写的TCP服务是怎么样的吧。
通过gcc
命令分别编译socket.c和socket_client.c文件。然后分别执行编译后的可执行文件socket_server 和 socket_client。这时候TCP服务器和TCP客户端就建立好连接了。
gcc socket.c -o ./socket_server
gcc socket_client.c -o ./socket_client
通过命令行直接启动TCP服务器和客户端即可进行TCP通讯。
socket_server
socket_client ip //ip为你TCP服务器当前的ip地址。如果是本机测试可以使用ifconfig查询ip,本案例默认服务器端口为6666
那既然都知道了TCP的底层是这样实现的,那如何把底层TCP服务提供给Node.js使用呢。
首先TCP服务器端会创建一个socket;然后bind系统调用把addr中的地址分配给与描述符socket关联的未命名套接字,地址结构的长度由addr_len指定, 通过socket调用创建的套接字必须经过命名(绑定地址)后才能使用;然后调用listen()来监听这个socket。
从图中可以清晰的看出,服务器端与客户端的区别在于,服务器端监听端口等待连接请求,而客户端则是请求连接。
回到net.js模块是如何实现这个TCP服务器的创建的呢。
首当其冲的就是
看上面的TCP服务器创建代码,你可能会以为net.createServer()
函数调用之后就会创建一个socket实例对象。
其实不是的,net.createServer()
只是用来初始化参数的,包含连接成功后的回调函数。
//net.js
function Server(options, connectionListener) {
EventEmitter.call(this);
this.on('connection', connectionListener);
this._handle = null;
this._usingSlaves = false;
this._slaves = [];
this._unref = false;
this.allowHalfOpen = options.allowHalfOpen || false;
this.pauseOnConnect = !!options.pauseOnConnect;
}
Server
对象,最主要的参数是_handle
,这个参数保存着服务器的socket。
那么何时才创建socket呢。答案在listen
函数。
与net.createServer相同
整个socket的创建是从这里开始的,listen
函数主要对传入的参数进行判断以及补全。
//net.js [Function Server.prototype.listen]
var [options, cb] = normalizeArgs(args);
//...
if (options instanceof TCP) { //tcp
this._handle = options;
listen(this, null, -1, -1, backlog);
} else if (typeof options.fd === 'number' && options.fd >= 0) {//根据fd创建
listen(this, null, null, null, backlog, options.fd);
} else {
backlog = options.backlog || backlog;
if (typeof options.port === 'number' || typeof options.port === 'string' ||
(typeof options.port === 'undefined' && 'port' in options)) {
assertPort(options.port);
if (options.host) {// eq: 127.0.0.1 : 80
//检测ip 并监听
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive);
} else {
listen(this, null, options.port | 0, 4, backlog, undefined,
options.exclusive);
}
//创建管道pipe
} else if (options.path && isPipeName(options.path)) {
// UNIX socket or Windows pipe.
const pipeName = this._pipeName = options.path;
listen(this, pipeName, -1, -1, backlog, undefined, options.exclusive);
}
}
首先会通过对传入的参数使用normalizeArgs
函数进行序列化。backlog
表示等待TCP连接的队列大小。
传参情况:
函数返回的options
是一个对象,cb
就是创建完TCP服务器后的回调函数。
function normalizeArgs(args) {
var options = {};
if (args.length === 0) {
return [options];
} else if (args[0] !== null && typeof args[0] === 'object') {
// connect(options, [cb])
options = args[0];
} else if (isPipeName(args[0])) {
// connect(path, [cb]);
options.path = args[0];
} else {
// connect(port, [host], [cb])
options.port = args[0];
if (args.length > 1 && typeof args[1] === 'string') {
options.host = args[1];
}
}
var cb = args[args.length - 1];
if (typeof cb !== 'function')
cb = null;
return [options, cb];
}
如果我们传入了ip地址(上面创建TCP服务器就传入的ip),使用dns模块查询传入的ip地址的ip族。
require('dns').lookup(address, function doListening(err, ip, addressType) {
addressType = ip ? addressType : 4;
listen(self, ip, port, addressType, backlog, undefined, exclusive);//监听,回归正常流程
}
});
这个普通函数的工作就是判断到底是使用哪条进程来创建这个服务器。
//net.js [function listen]
if (!cluster) cluster = require('cluster');
//主进程 或者 独享一个进程和端口
if (cluster.isMaster || exclusive) {
self._listen2(address, port, addressType, backlog, fd);
return;
}
//工作线程
cluster._getServer(self, {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
}, cb);
function cb(err, handle) {
self._handle = handle;
self._listen2(address, port, addressType, backlog, fd);
}
成功确认使用哪条进程创建服务后,就要开始创建服务并且监听。
创建服务器net.js模块的工作都头了,现在剩下正在核心的部分应该让c/c++模块来实现了。
//net.js [function Server.prototype._listen2]
if (!address && typeof fd !== 'number') {
this._handle = createServerHandle('::', port, 6, fd);//创建服务器句柄
if ( this._handle === null)//一般创建TCP服务器的情况
this._handle = createServerHandle(address, port, addressType, fd);//创建服务器句柄
this._handle.onconnection = onconnection;
_listen(this._handle, backlog);//执行c++模块,打通服务
这个函数里面最重要的就只有3句话。
这个函数会返回服务器句柄。
/**
* [createServerHandle 创建服务器句柄]
* @param {[type]} address [ip地址]
* @param {[type]} port [端口号]
* @param {[type]} addressType [ip类型]
* @param {[type]} fd [文件描述符]
* @return {[type]} [description]
*/
function createServerHandle(address, port, addressType, fd) {
//文件描述符
if (typeof fd === 'number' && fd >= 0) {
handle = createHandle(fd);
} else if (port === -1 && addressType === -1) {//管道
handle = new Pipe();
} else { //TCP服务
handle = new TCP();
}
if (!address) {
err = handle.bind6('::', port);
if (err) {
handle.close();
// Fallback to ipv4
return createServerHandle('0.0.0.0', port);
}
} else if (addressType === 6) {
err = handle.bind6(address, port);//ip6
} else {
err = handle.bind(address, port);//ip4
}
return handle;
}
Pipe
,TCP
, TTYWrap
这3个对象都是由c/c++模块提供的。我们很清楚的看出createServerHandle
函数。
正常情况下已经完成服务器的创建已经端口绑定。
/**
* [createHandle 创建句柄]
* @param {[type]} fd [文件描述符]
* @return {[type]} [description]
*/
function createHandle(fd) {
//判断文件描述符类型
var type = TTYWrap.guessHandleType(fd);
if (type === 'PIPE') return new Pipe();
if (type === 'TCP') return new TCP();
}
c/c++模块中guessHandleType
会根据文件描述符fd
来判断类型:
//tty_wrap.cc [GuessHandleType]
uv_handle_type t = uv_guess_handle(fd);
const char* type = nullptr;
switch (t) {
case UV_TCP: type = "TCP"; break;
case UV_TTY: type = "TTY"; break;
case UV_UDP: type = "UDP"; break;
case UV_FILE: type = "FILE"; break;
case UV_NAMED_PIPE: type = "PIPE"; break;
case UV_UNKNOWN_HANDLE: type = "UNKNOWN"; break;
default:
ABORT();
}
_listen 执行c++模块创建底层的服务。
handle.listen(backlog || 511);
//tcp_wrap.cc
int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
backlog,
OnConnection);
//stream.c
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
err = -EINVAL;
}
if (err == 0)
uv__handle_start(stream);
return err;
}
好咯,到这里服务器服务要做的都做完了,就等着客户端发起请求连接。
当客户端发起请求连接,服务器接受请求后就会执行这个函数。创建Socket
对象,连接客户端。
/**
* [onconnection 连接成功处理函数]
* 这个函数是给c++调用的
* @param {[type]} err [description]
* @param {[type]} clientHandle [description]
* @return {[type]} [description]
*/
function onconnection(err, clientHandle) {
var handle = this;
var self = handle.owner;
//创建socket对象
var socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect
});
//触发connection函数,也就是我们使用net.createServer时传入的函数
self.emit('connection', socket);
}
这个的net.js部分是没有代码显示的调用this._handle.onconnection
函数的。这部分是c/c++模块做的。
答案就在3.4.2的uv_listen函数的OnConnection
函数中。
//connection_wrap.cc
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
int status) {
WrapType* wrap_data = static_cast<WrapType*>(handle->data);
if (status == 0) {
Local<Object> client_obj = WrapType::Instantiate(env, wrap_data);//客户端对象
uv_stream_t* client_handle =
reinterpret_cast<uv_stream_t*>(&wrap->handle_);//客户端对象句柄
if (uv_accept(handle, client_handle))//接受客户端请求
return;
argv[1] = client_obj;// 接受请求成功,执行js那边的回调。
}
//执行onconnection 函数
wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);
}
接下来就是socket的事了。
Socket
对象创建TCP客户端。主要包含2个方法和一个事件监听器。
通过net.connect
创建Socket
实例对象。
我们先来看看它们的继承关系。
Socket.prototype.write
的原型方法其实是使用的stream.Duplex.prototype.write
的原型方法,
而stream.Duplex
的write
方法其实又是来自Writable.prototype.write
的原型方法。
所以我们最终调用的是Writeable
的原型方法。
//net.js
Socket.prototype.write = function(chunk, encoding, cb) {
return stream.Duplex.prototype.write.apply(this, arguments);
};
当我们执行socket.write('send to server')
时,会执行writeOrBuffer
方法。
//stram_writeable.js
Writable.prototype.write = function(chunk, encoding, cb) {
//...
if (state.ended)
writeAfterEnd(this, cb);
else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}
return ret;
};
writeOrBuffer
方法会进行参数判断和配置。一切正常就会执行doWrite
方法。
//stram_writeable.js
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding);//解析数据
if (chunk instanceof Buffer)
encoding = 'buffer';
var len = state.objectMode ? 1 : chunk.length;
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {//一切正常
doWrite(stream, state, false, len, chunk, encoding, cb);
}
return ret;
}
doWrite
表示,到了这一步,就要开始往socket
的另一端开始写数据了,stream
对象就是socket
对象。
//stram_writeable.js
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
//...
if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
Socket
原型有_writev
和_write
方法,都是Socket.prototype._writeGeneric
方法的不同参数调用而已。
//net.js
/**
* [_writeGeneric 往客户端写数据]
* @param {[type]} writev [description]
* @param {[type]} data [数据]
* @param {[type]} encoding [编码]
* @param {Function} cb [回调]
* @return {[type]} [description]
*/
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
//...
var req = new WriteWrap();//创建写请求
req.handle = this._handle;
req.oncomplete = afterWrite;
req.async = false;
var err;
if (writev) {
var chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
err = this._handle.writev(req, chunks);//数据传输
// Retain chunks
if (err === 0) req._chunks = chunks;
} else {
var enc;
if (data instanceof Buffer) {
enc = 'buffer';
} else {
enc = encoding;
}
err = createWriteReq(req, this._handle, data, enc);//发送数据
}
if (req.async && this._handle.writeQueueSize != 0)//异步
req.cb = cb;
else
cb();
};
createWriteReq
就是创建写数据请求,通过C/C++核心模块的stream对象进行写数据。
到这里JavaScript
模块的工作就完成了,剩下的就交给C/C++完成。
//net.js
/**
* [createWriteReq 数据传输]
* @param {[type]} req [请求对象]
* @param {[type]} handle [句柄]
* @param {[type]} data [数据]
* @param {[type]} encoding [编码]
* @return {[type]} [description]
*/
function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'buffer':
return handle.writeBuffer(req, data);
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}
createWriteReq
函数会根据encoding
来调用不同的C/C++模块的方法。
handle.writeBuffer
等方法都可以在stream_base.cc
里面找到。让我们来看看C/C++部分如何实现的。
//stream_wrap.cc
int StreamWrap::DoWrite(WriteWrap* w,uv_buf_t* bufs,size_t count,uv_stream_t* send_handle) {
int r;
if (send_handle == nullptr) {
r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
} else {
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
}
return r;
}
好吧!已经进入了调用libuv函数的地步了。追到最后发现了sendmsg
函数,没错已经算到头了。
//deps/uv/src/unix/stream.c
do {
n = sendmsg(uv__stream_fd(stream), &msg, 0);
}
#if defined(__APPLE__)
while (n == -1 && (errno == EINTR || errno == EPROTOTYPE));
#else
while (n == -1 && errno == EINTR);
#endif
sendmsg()用来将数据由指定的socket传给对方主机.
我们先来看看Node.js官方文档是怎么描述这个方法的。
Half-closes the socket. i.e., it sends a FIN packet. It is possible the server will still send some data.
If
data
is specified, it is equivalent to callingsocket.write(data, encoding)
followed bysocket.end()
.
关闭socket的一半,及变成半双工模式,发送FIN
报文。但服务器依然可以发送数据过来。
使用该方法的时候如果传入了data
参数,那么就会先执行socket.write
方法把数据发送除去然后再执行socket.end
方法。
//net.js
Socket.prototype.end = function(data, encoding) {
stream.Duplex.prototype.end.call(this, data, encoding);
this.writable = false;
if (this.readable && !this._readableState.endEmitted)
this.read(0);
else
maybeDestroy(this);
};
根据上图4.1的继承关系可知,Socket.prototype.end
调用的是Writable.prototype.end
方法。
end
方法发送数据使用的也是write
方法。
Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;
if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);
if (state.corked) {
state.corked = 1;
this.uncork();
}
if (!state.ending && !state.finished)
endWritable(this, state, cb);
};
endWritable
会进行状态判断,然后进行状态的设置以及触发监听事件的回调。比如finish
事件。
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state);
//...
state.ended = true;
stream.writable = false;
}
maybeDestroy(this)
函数最后会调用socket._destroy
方法。
Socket.prototype._destroy = function(exception, cb) {
if (this.destroyed) {
return;
}
this.connecting = false;
this.readable = this.writable = false;
for (var s = this; s !== null; s = s._parent)
timers.unenroll(s);
if (this._handle) {
this[BYTES_READ] = this._handle.bytesRead;
this._handle.close(() => {
this.emit('close', isException);
});
this._handle.onread = noop;
this._handle = null;
this._sockname = null;
}
this.destroyed = true;
if (this._server) {
this._server._connections--;
}
};
关闭socket最重要的就是状态的还原。this._handle.close()
关闭socket连接。
你可能很好奇,为什么没有讲read
方法。其实是有原因的:因为实际中,我们并不知道客户端或者服务器端发送过来
的数据到底有多长。
初始化Socket实例的时候,会添加onread
函数。当有数据到达时C/C++核心模块会触发该监听事件:
//net.js [function initSocketHandle]
self._handle.onread = onread;
//[function onread]
function onread(nread, buffer) {
var handle = this;
var self = handle.owner;
if (nread > 0) {
var ret = self.push(buffer);
if (handle.reading && !ret) {
handle.reading = false;
var err = handle.readStop();
}
return;
}
if (nread === 0) {
return;
}
self.push(null);
if (self._readableState.length === 0) {
self.readable = false;
maybeDestroy(self);
}
self.emit('_socketEnd');
}
self.push(buffer)
会触发stream.emit('data',chunk)
事件。只有等到stream.emit('readable')
触发的时候(也就是:self.push(null)
时),才说明数据全部接收完成。