IPC(Inter-Process Communication),即进程间通信,目的是让不同进程能互相访问资源,并进行协调工作。
实现 IPC 的技术有很多,比如:
- 命名(匿名)管道
- socket(通过网络 socket 互相发送也是一种 IPC)
- 信号量
- 共享内存
- 消息队列
- UDS(Unix Domain Socket)
但以上的方式中,除了信号(操作系统提供了接口让进程可以直接修改另一个进程的数据 PCB)外,基本上都是使用独立与进程的额外内存作为信息承载的地方,然后在通过某种方式让多个进程都可以访问到这块公共内存,比如管道、共享内存、Unix 域、消息队列等等。
一、Node 内部 IPC 原理
在 Node 中,实现 IPC 是通过“管道技术”。但此管道非彼管道,在 Node 中管道只是一个抽象层面的称呼。具体细节由 libuv 提供。我们只需记住,在 *nix 系统中,管道由 Unix Domain Socket 实现。
在应用层面,IPC 只有简单的 message 事件和 send() 方法。IPC 通道被抽象为 Stream 对象,在调用 send() 发送数据时,接受到的消息会通过 message 事件触发给应用层。
父进程在实际创建子进程前,会创建IPC 管道并监听它,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个 IPC 通道的文件描述符。子进程在启动过程中,根据文件描述符去连接这个已存在的 IPC 通道,从而完成父子进程间的连接。
二、利用 IPC 进行句柄传递
Node IPC 中的 send 方法除了发送数据外,还可以发送句柄。
child.send(message, [sendHandle])
2.1 句柄
句柄是一种可以用来标识资源的引用,内部包含了指向对象的文件描述符。比如,句柄可以用来标识一个服务端 socket 对象、一个客户端 socket 对象、一个 UDP 套接字、一个管道等。
2.2 利用句柄传递解决多进程监听同一端口问题
主进程接收到 socket 请求后,将这个 socket 直接发送给工作进程,而不是重新与工作进程之间简历新的 socket 连接来转发数据。
主进程代码:
const childProcess = require('child_process');
const net = require('net');
const worker = childProcess.fork(filepath);
// 主进程监听端口
const server = net.createServer((socket) => {
// 通过文件描述符传递把连接传递到子进程处理
worker.send(null, socket);
});
server.listen(8080);
子进程代码:
process.on('message', (message, socket) => {
socket.destroy();
});
2.3 句柄传递的原理
send() 方法可以发送的句柄类型包括一下几种:
- net.Socket (TCP 套接字)
- net.Server (TCP 服务器)
- net.Native (C++ 层面的 TCP 套接字或 IPC 管道)
- dgram.Socket (UDP 套接字)
- dgram.Native (C++层面的 UDP 套接字)
2.3.1 句柄的发送与还原
发送包装
send() 方法在将消息发送到 IPC 管道前,将消息组装成两个对象。一个参数是 handle,另一个是 message。如,message 参数被组装成:
{
cmd: 'NODE_HANDLE',
type: 'net.Server',
msg: message
}
这个 message 对象在写入到 IPC 管道时,会通过 JSON.stringfy() 进行序列化。最终发送到 IPC 通道中到信息都是字符串。
发送到 IPC 管道的实际是我们要发送的句柄文件描述符,文件描述符实际上是一个整数值。
接收还原
连接了 IPC 通道的子进程,可以读取父进程发来的消息,将字符串通过 JSON.parse() 解析得到对象后,触发 message 事件将消息题传递给应用层使用。(在这个过程中,消息对象被过来处理,message.cmd 的值若以 NODE_ 为前缀,将响应一个内部事件 internalMessage。若 message.cmd 值为 NODE_HANDLE, 则将取出 message.type 值和得到的文件描述符一起还原出一个对应对象。 )

比如,在还原一个 tcp 服务器句柄。子进程会根据 message.type 创建对应的 TCP 服务器对象,然后监听到文件描述符上。
function(message, handle, emit){
var self = this;
var server = new net.Server();
server.listen(handle, function(){
emit(server)
})
}
Node 进程间只有消息传递,并不会真正的传递对象,这种错觉是抽象封装的结果。
2.3.2 端口共同监听
在上面的例子中,我们通过发送句柄,多个进程可以监听到相同的端口,而不引起 EADDRINUSE 异常。这是为什么?
其实答案很简单,我们独立启动的进程,TCP 服务器端 socket 套接字的文件描述符并不相同,导致监听到相同的端口会抛出异常。
Node 底层对每个端口监听都设置了 SO_REUSEADDR 选项,这个选项的含义是不同进程可以就相同的网卡和端口进行监听,这个服务器端套接字可以被不同进程服用。
由于独立启动的进程,彼此间不知道文件描述符,所以监听相同的端口就会失败。但对于 send() 发送的句柄还原出的服务来说,文件符相同,所以监听相同端口不会引起异常。
抢占式
但多个进程监听相同端口时,文件描述符同一时间只能被某个进程所用。所以服务器端处理请求时,只有一个幸运的进程能够抢到连接,并为这个请求服务。
基于 child_process 和 net 模块的组合,Node 提供了 cluster 管理服务集群,使得开发者不必关注 通过 child_process 实现的单机 Node 集群。
三、UDS
3.1 概述
Unix domain socket 又叫 IPC(inter-process communication 进程间通信) socket,用于实现同一主机上的进程间通信。
socket 原本是为网络通讯设计的,但后来在 socket 的框架上发展出一种 IPC 机制,就是 UNIX domain socket。
虽然网络 socket 也可用于同一台主机的进程间通讯(通过 loopback 地址 127.0.0.1),但是 UNIX domain socket 用于 IPC 更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。(IPC 机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。)
UNIX domain socket 是全双工的,API 接口语义丰富,相比其它 IPC 机制有明显的优越性,目前已成为使用最广泛的 IPC 机制。
unix domain 有一个缺点,就是要创建一个文件(新的内核可以不用)。进程 crash 可能会导致这个文件没有被删除,比较尴尬
3.2 使用 UDS 进行 IPC
Unix domain socket 主要用于同一主机上的进程间通信。与主机间的进程通信不同,它不是通过 “IP 地址 + TCP 或 UDP 端口号” 的方式进程通信,而是使用 socket 类型的文件来完成通信,因此在稳定性、可靠性以及效率方面的表现都很不错。
3.2.1 创建 IPC 服务器
server.listen(path[, backlog][, callback]) 用于 IPC 服务器
net.createServer().listen(
path.join('\\\\?\\pipe', process.cwd(), 'myctl'));
// Unix 域套接字将在文件系统中可见,并且会一直存在,直到取消链接
如:
http
.createServer((req: any, res: Writable) => {
try {
let body = '';
req.on('data', (chunk: string) => {
body += chunk;
});
req.on('end', async () => {
try {
const buffer = Buffer.from(body, 'base64');
const dataBuf = await unzipFn(buffer);
const options: BeaconParams = JSON.parse(dataBuf.toString());
const metricsReportRes = await this.handleRecords(options);
if (metricsReportRes.some(item => item)) {
res.end(
responseHandler(
ReporterServerStatus.OVER_METRICS_SIZE,
metricsReportRes.filter(item => item)
)
);
} else {
res.end(responseHandler(ReporterServerStatus.OK));
}
} catch (error) {
appLogger.error('promethous server handle metrics error', error);
res.end(responseHandler(ReporterServerStatus.HANDLE_METRICS_ERROR));
}
});
} catch (e) {
appLogger.error('server error', e);
res.end(responseHandler(ReporterServerStatus.INTERNAL_ERROR));
}
})
.listen(sockFile);
3.2.2 向一个 Unix domain socket 发送数据
import request from 'request';
request(
{
uri: `http://unix:${sockFile}`,
method: 'post',
body: dataStr
},
(err, res, body) => {
if (err) {
appLogger.error(`Emitter|result|prometheus server error, ${err}`);
reject(data);
}
const { code, msg, data: resData } = safeParse(body) as ReporterRes;
if (code === 0) {
resolve(msg);
}
}
)