【nodejs原理&源碼賞析(6)】深度剖析cluster模塊源碼與node.js多進(jìn)程(下)
示例代碼托管在:http://www.github.com/dashnowords/blogs
博客園地址:《大史住在大前端》原創(chuàng)博文目錄
華為云社區(qū)地址:【你要的前端打怪升級(jí)指南】
【nodejs原理&源碼賞析(6)】深度剖析cluster模塊源碼與node.js多進(jìn)程(下)一. 引言二.server.listen方法三.cluster._getServer( )方法四.跨進(jìn)程通訊工具方法Utils五.act:queryServer消息六.輪詢調(diào)度Round-Robin-Handle七. 圖解集群建立過(guò)程的邏輯跳轉(zhuǎn)
閱讀本章需要先閱讀本系列前兩章內(nèi)容預(yù)熱一下。
一. 引言
前兩篇博文中已經(jīng)分別介紹了使用cluster模塊建立集群時(shí)主進(jìn)程執(zhí)行cluster.fork( )方法時(shí)的執(zhí)行邏輯,以及net模塊在不同場(chǎng)景下建立通訊的基本原理。本篇繼續(xù)分析cluster模塊,從第一個(gè)子進(jìn)程開(kāi)始建立服務(wù)器講起,cluster基本用法示例代碼再來(lái)一遍:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主進(jìn)程 ${process.pid} 正在運(yùn)行`);
// 衍生工作進(jìn)程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作進(jìn)程 ${worker.process.pid} 已退出`);
});
} else {
// 工作進(jìn)程可以共享任何 TCP 連接。
// 在本例子中,共享的是 HTTP 服務(wù)器。
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好世界\n');
}).listen(8000);
console.log(`工作進(jìn)程 ${process.pid} 已啟動(dòng)`);
}
代碼是足夠精簡(jiǎn)的,實(shí)現(xiàn)過(guò)程也確實(shí)是很龐大的工程。每一個(gè)子進(jìn)程中執(zhí)行的邏輯都是http.createServer().listen(),我們來(lái)看看它是如何一步一步運(yùn)作而最終建立通訊機(jī)制的,你會(huì)發(fā)現(xiàn)它和上一節(jié)中的簡(jiǎn)易模型非常相似。
二.server.listen方法
在http模塊的源碼中很容易找到http.createServer( )方法的邏輯就是透?jìng)鲄?shù)生成了一個(gè)net.Server實(shí)例,這個(gè)實(shí)例在上一節(jié)中就已經(jīng)介紹過(guò),實(shí)際上就只是生成了一個(gè)server的實(shí)例,所以這里跳轉(zhuǎn)到net.Server.prototype.listen()(net.js文件1306-1404行),基本邏輯如下:
Server.prototype.listen = function(...args){
const normalized = normalizeArgs(args);
var options = normalized[0];
/*..獲取監(jiān)聽(tīng)參數(shù)中的句柄對(duì)象..*/
options = options._handle || options.handle || options;
//如果options上有句柄,句柄是一個(gè)TCP實(shí)例
if(options instanceof TCP){
//......
listenInCluster(......);
}
//如果配置參數(shù)中有fd(file descriptor)
if(typeof options.fd === 'number' && options.fd >=0){
//......
listenInCluster(......);
}
//如果參數(shù)中有port端口號(hào)
if(typeof options.port === 'number' || typeof options.port === 'string'){
//.....
listenInCluster(......);
}
//如果參數(shù)中有port端口號(hào) 或 字符型的pipe名稱
if(typeof options.port === 'number' || typeof options.port === 'string'){
//.....
listenInCluster(......);
}
}
這里不難看出它的邏輯就和net模塊官方文檔中描述的server.listen( )的幾種場(chǎng)景對(duì)應(yīng),可以監(jiān)聽(tīng)?zhēng)в蟹强説andle屬性的句柄對(duì)象,數(shù)字型端口號(hào),字符串型命名管道地址,或者直接傳入配置參數(shù)合集options,然后分別根據(jù)幾種不同的情況來(lái)調(diào)用listenInCluster方法(集群功能的邏輯主線是數(shù)字型port,假設(shè)傳入了12315)。
listenInCluster方法定義如下:
大致可以看出,如果是主進(jìn)程,就直接調(diào)用server._listen2()方法然后return了,否則(也就是在工作進(jìn)程中的邏輯,敲黑板!!!這里是重點(diǎn)了),構(gòu)造一個(gè)serverQuery的參數(shù)集,可以看到里面記錄了以各種不同姿勢(shì)調(diào)用這個(gè)方法時(shí)傳入的參數(shù),所以有的參數(shù)為null也很正常,然后調(diào)用了cluster._getServer( ?)方法,這就是工作進(jìn)程在引用cluster模塊時(shí)引入的child.js中定義并掛載在cluster上的方法,最后一個(gè)參數(shù)listenOnMasterHandle是一個(gè)回調(diào)函數(shù),也是一個(gè)錯(cuò)誤前置風(fēng)格的函數(shù),可以看到,它接收了一個(gè)句柄對(duì)象,并把這個(gè)句柄對(duì)象掛載在了子進(jìn)程這個(gè)server實(shí)例的_handle屬性上,接著也調(diào)用了server._listen2( )方法,可以看到兩種情況下調(diào)用這個(gè)方法時(shí)傳入的參數(shù)是一樣的。接著來(lái)到server._listen2( )方法,它綁定了setupListenHandle方法(別抓狂,這是net模塊中相關(guān)邏輯的最后一步了),簡(jiǎn)化代碼如下:
function setupListenHandle(......){
if (this._handle) {
//工作進(jìn)程在執(zhí)行上一步邏輯時(shí),在cluster._getServer()回調(diào)函數(shù)中把一個(gè)handle傳遞給了server._handle
debug('setupListenHandle: have a handle already');
} else {
//主進(jìn)程會(huì)執(zhí)行的邏輯
debug('setupListenHandle: create a handle');
//......
rval = createServerHandle(address, port, addressType, fd, flags);
//......
this._handle = rval;
}
//......
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
//....
}
工作進(jìn)程通過(guò)cluster._getServer( )方法拿到了一個(gè)handle,所以不會(huì)再生成,而主進(jìn)程server.listen(port)執(zhí)行時(shí)會(huì)走到else分支,然后生成一個(gè)新的綁定了端口號(hào)的特殊的socket句柄然后掛載到主進(jìn)程server._handle上,這里對(duì)句柄的connection事件回調(diào)邏輯進(jìn)行了修改,相關(guān)代碼如下:
這里需要注意的是,server._handle的connection事件和server的connection事件是兩碼事,server._handle指向的是一個(gè)綁定了端口的特殊的socket句柄,當(dāng)客戶端connect一個(gè)server時(shí)實(shí)際上底層是客戶端socket與服務(wù)端這個(gè)socket的對(duì)接,所以需要在server._handle這個(gè)的connection回調(diào)函數(shù)中,將客戶端的socket句柄clientHandle重新包裝,然后再通過(guò)觸發(fā)server的connection事件將其轉(zhuǎn)發(fā)給server實(shí)例。所以在使用server實(shí)例時(shí)可以直接添加connectionListener:
let server = net.createServer(socket=>{
/*這個(gè)回調(diào)函數(shù)就是server的connection事件回調(diào)
* 這里接收到的socket就是server._handle的connection收到的客戶端句柄clientHandle封裝成的socket實(shí)例
*/
})
無(wú)論是主進(jìn)程還是子進(jìn)程都會(huì)觸發(fā)這個(gè)邏輯,只需要看成是一種功能性質(zhì)的封裝即可,并不影響業(yè)務(wù)邏輯。
三.cluster._getServer( )方法
下面回到cluster模塊繼續(xù),_getServer( )方法只存在于子進(jìn)程代碼中,源碼位于lib/internal/cluster/child.js,方法定義在54-106行,基本邏輯如下:
cluster._getServer = function(obj, options, cb){
/* 這里的obj就是子進(jìn)程中運(yùn)行上面listenInCluster方法中傳入的server,
* options就是serverQuery,
* cb就是最后要把主進(jìn)程handle傳回去的回調(diào)函數(shù)listenOnMasterHandler
*/
//先構(gòu)建index然后進(jìn)行了一通記錄,就是根據(jù)監(jiān)聽(tīng)的參數(shù)來(lái)構(gòu)建一個(gè)識(shí)別這個(gè)server的索引
//然后構(gòu)建消息
const message = {
act: 'queryServer',
index,
data: null,
...options
};
//......
/* 發(fā)送act:queryServer消息,并傳一個(gè)回調(diào)函數(shù),
* 從形參命名就可以看出,這個(gè)回調(diào)函數(shù)被調(diào)用時(shí)會(huì)被傳入一個(gè)句柄,
* 最后根據(jù)不同的調(diào)度策略來(lái)執(zhí)行不同的函數(shù),這里主要看Round-robin
*/
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); ?// Shared listen socket.
else
rr(reply, indexesKey, cb); ? ? ? ? ? ? ?// Round-robin.
});
//......
}
rr方法將響應(yīng)reply和前一個(gè)調(diào)用者傳入的回調(diào)函數(shù)cb進(jìn)行了透?jìng)鳎瑀r的函數(shù)體就是實(shí)現(xiàn)listen方法偷梁換柱的地方了:
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) {
return 0;
}
function close() {
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
function getsockname(out) {
if (key)
Object.assign(out, message.sockname);
return 0;
}
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; ?// TCP handles only.
}
assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle); //這里的cb其實(shí)就是listenInCluster方法中定義的那個(gè)listenOnMasterHandler回調(diào)
}
可以看到rr方法中構(gòu)建了一個(gè)假的handle句柄,并調(diào)用cb將它傳了回去,然后執(zhí)行邏輯回回到net模塊,前文已經(jīng)提這個(gè)handle在回調(diào)函數(shù)中被掛載在了server._handle上,于是setupListenHandle( )的邏輯中也不會(huì)重新構(gòu)建句柄。
重新梳理一下這部分的邏輯,就是子進(jìn)程中調(diào)用listen方法時(shí),會(huì)通過(guò)cluster._getServer( )拿到一個(gè)假句柄,然后執(zhí)行一個(gè)空的listen方法,這樣就避免了端口的重復(fù)監(jiān)聽(tīng)。所以我們可以推測(cè),cluster._getServer( )必然會(huì)觸發(fā)主進(jìn)程啟動(dòng)一個(gè)監(jiān)聽(tīng)端口的服務(wù)器,并建立對(duì)子進(jìn)程的調(diào)度,進(jìn)程之間的IPC通訊可以直接通過(guò)process對(duì)象來(lái)完成,不需要再重新構(gòu)建跨進(jìn)程通訊管道。
四.跨進(jìn)程通訊工具方法Utils
繼續(xù)進(jìn)行后續(xù)內(nèi)容前,先來(lái)看一個(gè)獨(dú)立的跨進(jìn)程通訊工具,源碼放在lib/internal/cluster/utils.js。
它是cluster模塊發(fā)送跨進(jìn)程消息的內(nèi)部代理,這個(gè)模塊對(duì)外暴露了消息發(fā)送方法sendHelper和內(nèi)部消息-的預(yù)處理方法internal,源碼很短就不貼了。當(dāng)子進(jìn)程調(diào)用sendHelper發(fā)送消息時(shí),utils內(nèi)部會(huì)把這條消息處理完成后需要執(zhí)行的回調(diào)函數(shù)先緩存起來(lái),然后給消息添加一些包裝標(biāo)記,然后再發(fā)出去;internal會(huì)對(duì)傳入的內(nèi)部消息-進(jìn)行代理,過(guò)濾掉非NODE_CLUSTER類別的消息,如果消息攜帶的message對(duì)象沒(méi)有ack屬性則最終會(huì)執(zhí)行綁定監(jiān)聽(tīng)時(shí)傳入的回調(diào)函數(shù),否則會(huì)從緩存中找出之前暫存的回調(diào)函數(shù)來(lái)執(zhí)行。
發(fā)個(gè)消息為什么要搞這么復(fù)雜呢?這個(gè)ack屬性又是哪來(lái)的呢?其實(shí)這個(gè)utils模塊主要是在跨進(jìn)程的雙向消息通訊時(shí)實(shí)現(xiàn)了方法復(fù)用,同一個(gè)message從工作進(jìn)程發(fā)往主進(jìn)程時(shí)和主進(jìn)程發(fā)回給工作進(jìn)程時(shí)是由同一個(gè)事件名internalMessage攜帶的,那如何來(lái)區(qū)分消息發(fā)送的方向呢,就是ack屬性,如果消息帶有ack屬性,就表示它是由主進(jìn)程發(fā)給子進(jìn)程的,那么就要調(diào)用子進(jìn)程中的后續(xù)處理方法,這個(gè)方法其實(shí)就是子進(jìn)程發(fā)送消息給主進(jìn)程之前暫存在utils內(nèi)部callbacks里的方法,也就是child.js中cluster._getServer()中調(diào)用send方法時(shí)傳入的回調(diào)方法,也就是net模塊中l(wèi)istenInCluster( )方法中的listenOnMasterHandle方法,這個(gè)方法漂洋過(guò)海透?jìng)髁薔個(gè)函數(shù),的確不容易看懂,“回調(diào)地獄”也的確不是鬧著玩的。再看看沒(méi)有ack屬性的情況,沒(méi)有這個(gè)屬性時(shí)消息是從子進(jìn)程發(fā)給主進(jìn)程的,自然要調(diào)用主進(jìn)程的方法,從邏輯里不難看出,這種情況下方法引用的就是internal方法執(zhí)行時(shí)傳入的第二個(gè)參數(shù)(master.js源碼213行執(zhí)行的internal(worker, onmessage)的onmessage這個(gè)函數(shù)),源碼中就是利用高階函數(shù)這種分步執(zhí)行的特點(diǎn)實(shí)現(xiàn)了引用。
五.act:queryServer消息
故事再回到第三節(jié)工作進(jìn)程中發(fā)出act:'queryServer的消息后,來(lái)看主進(jìn)程master.js中的代碼,主進(jìn)程中在調(diào)用cluster.fork( )時(shí)就綁定了對(duì)worker線程internalMessage的監(jiān)聽(tīng),對(duì)于act:queryServer類型的集群消息,主進(jìn)程已經(jīng)定義了queryServer這個(gè)方法來(lái)處理。這段源代碼的主要邏輯如下:
1.根據(jù)重要參數(shù)組拼接出一個(gè)唯一的key
2.1.根據(jù)key查詢是否有已經(jīng)存在的調(diào)度句柄round-robin-handle,如果有則直接進(jìn)行后續(xù)邏輯
2.2.如果沒(méi)有已經(jīng)存在的調(diào)度句柄,則選擇調(diào)度策略,實(shí)例化一個(gè)調(diào)度句柄,并把它添加進(jìn)記錄里
3.把消息數(shù)據(jù)message.data掛載在調(diào)度句柄的handle.data字段上
4.執(zhí)行調(diào)度句柄的add方法,把子進(jìn)程和一個(gè)回調(diào)方法傳進(jìn)實(shí)例,回調(diào)方法被執(zhí)行時(shí)會(huì)從調(diào)度句柄中取得數(shù)據(jù),并組裝返回消息(帶有ack屬性和其他數(shù)據(jù)的消息)發(fā)給子進(jìn)程,子進(jìn)程收到這個(gè)消息后執(zhí)行的方法,就是前文分析過(guò)的返回假句柄給net模塊中的`listenInCluster()`邏輯。
從開(kāi)篇的多進(jìn)程代碼可以看到,每個(gè)子進(jìn)程中執(zhí)行的listen方法監(jiān)聽(tīng)的端口號(hào)都是一樣的,所以每個(gè)子進(jìn)程發(fā)送queryServer消息給主進(jìn)程并執(zhí)行這段邏輯時(shí),其實(shí)對(duì)應(yīng)的key都是一樣的,所以調(diào)度對(duì)象RoundRobinHandle只會(huì)實(shí)例化一次,在之后的過(guò)程中,每一個(gè)子進(jìn)程會(huì)根據(jù)key獲取到同一個(gè)調(diào)度實(shí)例,并調(diào)用add方法將worker對(duì)象和一個(gè)回調(diào)函數(shù)添加進(jìn)調(diào)度實(shí)例,可以看到回調(diào)函數(shù)執(zhí)行時(shí),就會(huì)將原message中的seq屬性的值添加給ack屬性再掛載上處理后的數(shù)據(jù)并發(fā)送給子進(jìn)程。那么剩下的事情,就剩下調(diào)度對(duì)象RoundRobinHandle的源碼了。
我們不妨來(lái)推測(cè)一下,它的主要邏輯就是在主進(jìn)程中建立真正監(jiān)聽(tīng)目標(biāo)端口的服務(wù)器,并添加當(dāng)客戶端請(qǐng)求到達(dá)時(shí)對(duì)于工作進(jìn)程的調(diào)度代碼,下一節(jié)我們就一起來(lái)驗(yàn)證一下。
六.輪詢調(diào)度Round-Robin-Handle
調(diào)度方法的源碼是internal/cluster/round_robin_handle.js,另一種shared_handle.js是windows下使用的調(diào)度策略,先不做分析(主要是沒(méi)研究過(guò),不敢瞎說(shuō))。先從構(gòu)造函數(shù)開(kāi)始:
16行,bingo,終于看到主進(jìn)程啟動(dòng)服務(wù)器了。接著就是根據(jù)參數(shù)而分流的監(jiān)聽(tīng)方法,集群代碼中對(duì)應(yīng)的是20行的帶有有效port參數(shù)的情況,所以服務(wù)器就在主進(jìn)程啟動(dòng)了,最后來(lái)看看server開(kāi)始觸發(fā)listening事件時(shí)執(zhí)行的邏輯(此處調(diào)用的是once方法,所以只會(huì)執(zhí)行一次):
1.將主進(jìn)程server的內(nèi)部_handle句柄,掛載給round-robin-handle實(shí)例
2.當(dāng)這個(gè)句柄被連接時(shí)(也就是客戶端socket執(zhí)行connect方法連接后),會(huì)觸發(fā)它的`connection`事件,回調(diào)函數(shù)會(huì)調(diào)用`distribute`方法來(lái)分發(fā)這個(gè)客戶端socket句柄,注意32行后面半句的箭頭函數(shù)方法,這里的handle就是指客戶端`socket`實(shí)例。
3.將server._handle指向null
4.將server屬性指向null
如果你還記得net模塊中l(wèi)isten方法的邏輯的話可能會(huì)有印象,_handle的connection事件回調(diào)其實(shí)原本已經(jīng)被復(fù)寫過(guò)一次了,也就是說(shuō)單進(jìn)程運(yùn)行的程序在建立服務(wù)器時(shí),server._handle的connection事件會(huì)觸發(fā)server實(shí)例的connection事件,而在集群模式下,主進(jìn)程中調(diào)度實(shí)例中服務(wù)器句柄server._handle的connection再次被復(fù)寫,將邏輯改變?yōu)榉职l(fā)socket,而子進(jìn)程中的server._handle還是保持原來(lái)的邏輯。
最后一步指向null的邏輯還涉及到add方法,繼續(xù)看主進(jìn)程中調(diào)用的add方法:
這個(gè)send形參實(shí)際上就是主進(jìn)程中傳入的最終向子進(jìn)程發(fā)送返回消息的那個(gè)回調(diào)函數(shù),它被封裝進(jìn)了done函數(shù),這里需要著重看一下55行的邏輯,this.server === null這個(gè)條件實(shí)際上對(duì)應(yīng)的就是構(gòu)造函數(shù)中服務(wù)器開(kāi)始監(jiān)聽(tīng)的事件,所以55-59行的代碼以及構(gòu)造函數(shù)中添加的listening事件的回調(diào)函數(shù)需要聯(lián)合在一起來(lái)理解,也就是每個(gè)子進(jìn)程的send方法都被包裹在一個(gè)獨(dú)立的done函數(shù)中,這個(gè)函數(shù)會(huì)在主進(jìn)程的server處于listening狀態(tài)后觸發(fā)執(zhí)行,并且只觸發(fā)一次。當(dāng)它觸發(fā)時(shí),會(huì)從實(shí)例的handle屬性(也就是server的_handle句柄)上取得socket名稱然后調(diào)用send方法,這個(gè)特殊socket的名稱在回調(diào)函數(shù)中對(duì)應(yīng)reply形參,最終掛載在message中發(fā)回了子進(jìn)程。
至此其實(shí)主進(jìn)程和子進(jìn)程建立服務(wù)器的消息已經(jīng)完成了閉環(huán)。最后再看一下RoundRobinHandle中最后兩個(gè)方法:
當(dāng)客戶端socket執(zhí)行connect方法連接到主進(jìn)程server的句柄后,主進(jìn)程會(huì)調(diào)用round-robin-handle實(shí)例的distribute方法,這個(gè)方法的邏輯比較簡(jiǎn)單,把這個(gè)客戶端句柄加入到待處理隊(duì)列,然后從空閑進(jìn)程隊(duì)列頭部取出一個(gè)worker進(jìn)程,把它作為參數(shù)傳給handoff方法。
handoff方法中,從客戶端請(qǐng)求句柄隊(duì)列的頭部取出下一個(gè)待處理的socket,如果已經(jīng)沒(méi)有要處理的請(qǐng)求,就把傳進(jìn)來(lái)的worker放回空閑子進(jìn)程隊(duì)列free中。在add方法內(nèi)部封裝的done方法中也執(zhí)行了這個(gè)handoff方法,現(xiàn)在再回過(guò)頭來(lái)看這個(gè)add方法的作用,就是當(dāng)主進(jìn)程處于監(jiān)聽(tīng)狀態(tài)后,將每一個(gè)子進(jìn)程對(duì)象worker依次添加到空閑進(jìn)程隊(duì)列free中。最后夠早了一個(gè)新的act:newconn消息,并通過(guò)調(diào)度選出的worker.process對(duì)象實(shí)現(xiàn)跨進(jìn)程通訊來(lái)將待處理句柄和【新連接】消息發(fā)送給子進(jìn)程。
七. 圖解集群建立過(guò)程的邏輯跳轉(zhuǎn)
集群建立過(guò)程的邏輯大致的跳轉(zhuǎn)路徑如下,細(xì)節(jié)部分直接參考前文的講解即可。
Node.js
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。