C/C++ 服務(wù)器并發(fā)
1. 單線程 / 進(jìn)程
在 TCP 通信過程中,服務(wù)器端啟動之后可以同時和多個客戶端建立連接,并進(jìn)行網(wǎng)絡(luò)通信,但是在介紹 TCP 通信流程的時候,提供的服務(wù)器代碼卻不能完成這樣的需求,先簡單的看一下之前的服務(wù)器代碼的處理思路,再來分析代碼中的弊端:
//?server.c
#include?
#include?
#include?
#include?
#include?
int?main()
{
//?1.?創(chuàng)建監(jiān)聽的套接字
int?lfd?=?socket(AF_INET,?SOCK_STREAM,?0);
//?2.?將socket()返回值和本地的IP端口綁定到一起
struct?sockaddr_in?addr;
addr.sin_family?=?AF_INET;
addr.sin_port?=?htons(10000);???//?大端端口
//?INADDR_ANY代表本機(jī)的所有IP,?假設(shè)有三個網(wǎng)卡就有三個IP地址
//?這個宏可以代表任意一個IP地址
addr.sin_addr.s_addr?=?INADDR_ANY;??//?這個宏的值為0?==?0.0.0.0
int?ret?=?bind(lfd,?(struct?sockaddr*)&addr,?sizeof(addr));
//?3.?設(shè)置監(jiān)聽
ret?=?listen(lfd,?128);
//?4.?阻塞等待并接受客戶端連接
struct?sockaddr_in?cliaddr;
int?clilen?=?sizeof(cliaddr);
int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
//?5.?和客戶端通信
while(1)
{
//?接收數(shù)據(jù)
char?buf[1024];
memset(buf,?0,?sizeof(buf));
int?len?=?read(cfd,?buf,?sizeof(buf));
if(len?>?0)
{
printf("客戶端say:?%s\n",?buf);
write(cfd,?buf,?len);
}
else?if(len??==?0)
{
printf("客戶端斷開了連接...\n");
break;
}
else
{
perror("read");
break;
}
}
close(cfd);
close(lfd);
return?0;
}
在上面的代碼中用到了三個會引起程序阻塞的函數(shù),分別是:
accept():如果服務(wù)器端沒有新客戶端連接,阻塞當(dāng)前進(jìn)程 / 線程,如果檢測到新連接解除阻塞,建立連接
read():如果通信的套接字對應(yīng)的讀緩沖區(qū)沒有數(shù)據(jù),阻塞當(dāng)前進(jìn)程 / 線程,檢測到數(shù)據(jù)解除阻塞,接收數(shù)據(jù)
write():如果通信的套接字寫緩沖區(qū)被寫滿了,阻塞當(dāng)前進(jìn)程 / 線程(這種情況比較少見)
如果需要和發(fā)起新的連接請求的客戶端建立連接,那么就必須在服務(wù)器端通過一個循環(huán)調(diào)用 accept() 函數(shù),另外已經(jīng)和服務(wù)器建立連接的客戶端需要和服務(wù)器通信,發(fā)送數(shù)據(jù)時的阻塞可以忽略,當(dāng)接收不到數(shù)據(jù)時程序也會被阻塞,這時候就會非常矛盾,被 accept() 阻塞就無法通信,被 read() 阻塞就無法和客戶端建立新連接。因此得出一個結(jié)論,基于上述處理方式,在單線程 / 單進(jìn)程場景下,服務(wù)器是無法處理多連接的,解決方案也有很多,常用的有四種:
使用多線程實(shí)現(xiàn)
使用多進(jìn)程實(shí)現(xiàn)
使用 IO 多路轉(zhuǎn)接(復(fù)用)實(shí)現(xiàn)
使用 IO 多路轉(zhuǎn)接 + 多線程實(shí)現(xiàn)
2. 多進(jìn)程并發(fā)
如果要編寫多進(jìn)程版的并發(fā)服務(wù)器程序,首先要考慮,創(chuàng)建出的多個進(jìn)程都是什么角色,這樣就可以在程序中對號入座了。在 Tcp 服務(wù)器端一共有兩個角色,分別是:監(jiān)聽和通信,監(jiān)聽是一個持續(xù)的動作,如果有新連接就建立連接,如果沒有新連接就阻塞。關(guān)于通信是需要和多個客戶端同時進(jìn)行的,因此需要多個進(jìn)程,這樣才能達(dá)到互不影響的效果。進(jìn)程也有兩大類:父進(jìn)程和子進(jìn)程,通過分析我們可以這樣分配進(jìn)程:
父進(jìn)程:
負(fù)責(zé)監(jiān)聽,處理客戶端的連接請求,也就是在父進(jìn)程中循環(huán)調(diào)用 accept() 函數(shù)
創(chuàng)建子進(jìn)程:建立一個新的連接,就創(chuàng)建一個新的子進(jìn)程,讓這個子進(jìn)程和對應(yīng)的客戶端通信
回收子進(jìn)程資源:子進(jìn)程退出回收其內(nèi)核 PCB 資源,防止出現(xiàn)僵尸進(jìn)程
子進(jìn)程:
負(fù)責(zé)通信,基于父進(jìn)程建立新連接之后得到的文件描述符,和對應(yīng)的客戶端完成數(shù)據(jù)的接收和發(fā)送。
發(fā)送數(shù)據(jù):send() / write()
接收數(shù)據(jù):recv() / read()
在多進(jìn)程版的服務(wù)器端程序中,多個進(jìn)程是有血緣關(guān)系,對應(yīng)有血緣關(guān)系的進(jìn)程來說,還需要想明白他們有哪些資源是可以被繼承的,哪些資源是獨(dú)占的,以及一些其他細(xì)節(jié):
子進(jìn)程是父進(jìn)程的拷貝,在子進(jìn)程的內(nèi)核區(qū) PCB 中,文件描述符也是可以被拷貝的,因此在父進(jìn)程可以使用的文件描述符在子進(jìn)程中也有一份,并且可以使用它們做和父進(jìn)程一樣的事情。
父子進(jìn)程有用各自的獨(dú)立的虛擬地址空間,因此所有的資源都是獨(dú)占的
為了節(jié)省系統(tǒng)資源,對于只有在父進(jìn)程才能用到的資源,可以在子進(jìn)程中將其釋放掉,父進(jìn)程亦如此。
由于需要在父進(jìn)程中做 accept() 操作,并且要釋放子進(jìn)程資源,如果想要更高效一下可以使用信號的方式處理
多進(jìn)程版并發(fā) TCP 服務(wù)器示例代碼如下:
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
//?信號處理函數(shù)
void?callback(int?num)
{
while(1)
{
pid_t?pid?=?waitpid(-1,?NULL,?WNOHANG);
if(pid?<=?0)
{
printf("子進(jìn)程正在運(yùn)行,?或者子進(jìn)程被回收完畢了\n");
break;
}
printf("child?die,?pid?=?%d\n",?pid);
}
}
int?childWork(int?cfd);
int?main()
{
//?1.?創(chuàng)建監(jiān)聽的套接字
int?lfd?=?socket(AF_INET,?SOCK_STREAM,?0);
if(lfd?==?-1)
{
perror("socket");
exit(0);
}
//?2.?將socket()返回值和本地的IP端口綁定到一起
struct?sockaddr_in?addr;
addr.sin_family?=?AF_INET;
addr.sin_port?=?htons(10000);???//?大端端口
//?INADDR_ANY代表本機(jī)的所有IP,?假設(shè)有三個網(wǎng)卡就有三個IP地址
//?這個宏可以代表任意一個IP地址
//?這個宏一般用于本地的綁定操作
addr.sin_addr.s_addr?=?INADDR_ANY;??//?這個宏的值為0?==?0.0.0.0
//????inet_pton(AF_INET,?"192.168.237.131",?&addr.sin_addr.s_addr);
int?ret?=?bind(lfd,?(struct?sockaddr*)&addr,?sizeof(addr));
if(ret?==?-1)
{
perror("bind");
exit(0);
}
//?3.?設(shè)置監(jiān)聽
ret?=?listen(lfd,?128);
if(ret?==?-1)
{
perror("listen");
exit(0);
}
//?注冊信號的捕捉
struct?sigaction?act;
act.sa_flags?=?0;
act.sa_handler?=?callback;
sigemptyset(&act.sa_mask);
sigaction(SIGCHLD,?&act,?NULL);
//?接受多個客戶端連接,?對需要循環(huán)調(diào)用?accept
while(1)
{
//?4.?阻塞等待并接受客戶端連接
struct?sockaddr_in?cliaddr;
int?clilen?=?sizeof(cliaddr);
int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
if(cfd?==?-1)
{
if(errno?==?EINTR)
{
//?accept調(diào)用被信號中斷了,?解除阻塞,?返回了-1
//?重新調(diào)用一次accept
continue;
}
perror("accept");
exit(0);
}
//?打印客戶端的地址信息
char?ip[24]?=?{0};
printf("客戶端的IP地址:?%s,?端口:?%d\n",
inet_ntop(AF_INET,?&cliaddr.sin_addr.s_addr,?ip,?sizeof(ip)),
ntohs(cliaddr.sin_port));
//?新的連接已經(jīng)建立了,?創(chuàng)建子進(jìn)程,?讓子進(jìn)程和這個客戶端通信
pid_t?pid?=?fork();
if(pid?==?0)
{
//?子進(jìn)程?->?和客戶端通信
//?通信的文件描述符cfd被拷貝到子進(jìn)程中
//?子進(jìn)程不負(fù)責(zé)監(jiān)聽
close(lfd);
while(1)
{
int?ret?=?childWork(cfd);
if(ret?<=0)
{
break;
}
}
//?退出子進(jìn)程
close(cfd);
exit(0);
}
else?if(pid?>?0)
{
//?父進(jìn)程不和客戶端通信
close(cfd);
}
}
return?0;
}
//?5.?和客戶端通信
int?childWork(int?cfd)
{
//?接收數(shù)據(jù)
char?buf[1024];
memset(buf,?0,?sizeof(buf));
int?len?=?read(cfd,?buf,?sizeof(buf));
if(len?>?0)
{
printf("客戶端say:?%s\n",?buf);
write(cfd,?buf,?len);
}
else?if(len??==?0)
{
printf("客戶端斷開了連接...\n");
}
else
{
perror("read");
}
return?len;
}
在上面的示例代碼中,父子進(jìn)程中分別關(guān)掉了用不到的文件描述符(父進(jìn)程不需要通信,子進(jìn)程也不需要監(jiān)聽)。如果客戶端主動斷開連接,那么服務(wù)器端負(fù)責(zé)和客戶端通信的子進(jìn)程也就退出了,子進(jìn)程退出之后會給父進(jìn)程發(fā)送一個叫做 SIGCHLD 的信號,在父進(jìn)程中通過 sigaction() 函數(shù)捕捉了該信號,通過回調(diào)函數(shù) callback() 中的 waitpid() 對退出的子進(jìn)程進(jìn)行了資源回收。
另外還有一個細(xì)節(jié)要說明一下,這是父進(jìn)程的處理代碼:
int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
while(1)
{
int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
if(cfd?==?-1)
{
if(errno?==?EINTR)
{
//?accept調(diào)用被信號中斷了,?解除阻塞,?返回了-1
//?重新調(diào)用一次accept
continue;
}
perror("accept");
exit(0);
}
}
如果父進(jìn)程調(diào)用 accept() 函數(shù)沒有檢測到新的客戶端連接,父進(jìn)程就阻塞在這兒了,這時候有子進(jìn)程退出了,發(fā)送信號給父進(jìn)程,父進(jìn)程就捕捉到了這個信號 SIGCHLD, 由于信號的優(yōu)先級很高,會打斷代碼正常的執(zhí)行流程,因此父進(jìn)程的阻塞被中斷,轉(zhuǎn)而去處理這個信號對應(yīng)的函數(shù) callback(),處理完畢,再次回到 accept() 位置,但是這是已經(jīng)無法阻塞了,函數(shù)直接返回 - 1,此時函數(shù)調(diào)用失敗,錯誤描述為 accept: Interrupted system call,對應(yīng)的錯誤號為 EINTR,由于代碼是被信號中斷導(dǎo)致的錯誤,所以可以在程序中對這個錯誤號進(jìn)行判斷,讓父進(jìn)程重新調(diào)用 accept(),繼續(xù)阻塞或者接受客戶端的新連接。
3. 多線程并發(fā)
編寫多線程版的并發(fā)服務(wù)器程序和多進(jìn)程思路差不多,考慮明白了對號入座即可。多線程中的線程有兩大類:主線程(父線程)和子線程,他們分別要在服務(wù)器端處理監(jiān)聽和通信流程。根據(jù)多進(jìn)程的處理思路,就可以這樣設(shè)計了:
主線程:
負(fù)責(zé)監(jiān)聽,處理客戶端的連接請求,也就是在父進(jìn)程中循環(huán)調(diào)用 accept() 函數(shù)
創(chuàng)建子線程:建立一個新的連接,就創(chuàng)建一個新的子進(jìn)程,讓這個子進(jìn)程和對應(yīng)的客戶端通信
回收子線程資源:由于回收需要調(diào)用阻塞函數(shù),這樣就會影響 accept(),直接做線程分離即可。
子線程:
負(fù)責(zé)通信,基于主線程建立新連接之后得到的文件描述符,和對應(yīng)的客戶端完成數(shù)據(jù)的接收和發(fā)送。
發(fā)送數(shù)據(jù):send() / write()
接收數(shù)據(jù):recv() / read()
在多線程版的服務(wù)器端程序中,多個線程共用同一個地址空間,有些數(shù)據(jù)是共享的,有些數(shù)據(jù)的獨(dú)占的,下面來分析一些其中的一些細(xì)節(jié):
同一地址空間中的多個線程的棧空間是獨(dú)占的
多個線程共享全局?jǐn)?shù)據(jù)區(qū),堆區(qū),以及內(nèi)核區(qū)的文件描述符等資源,因此需要注意數(shù)據(jù)覆蓋問題,并且在多個線程訪問共享資源的時候,還需要進(jìn)行線程同步。
多線程版 Tcp 服務(wù)器示例代碼如下:
#include?
#include?
#include?
#include?
#include?
#include?
struct?SockInfo
{
int?fd;??????????????????????//?通信
pthread_t?tid;???????????????//?線程ID
struct?sockaddr_in?addr;?????//?地址信息
};
struct?SockInfo?infos[128];
void*?working(void*?arg)
{
while(1)
{
struct?SockInfo*?info?=?(struct?SockInfo*)arg;
//?接收數(shù)據(jù)
char?buf[1024];
int?ret?=?read(info->fd,?buf,?sizeof(buf));
if(ret?==?0)
{
printf("客戶端已經(jīng)關(guān)閉連接...\n");
info->fd?=?-1;
break;
}
else?if(ret?==?-1)
{
printf("接收數(shù)據(jù)失敗...\n");
info->fd?=?-1;
break;
}
else
{
write(info->fd,?buf,?strlen(buf)+1);
}
}
return?NULL;
}
int?main()
{
//?1.?創(chuàng)建用于監(jiān)聽的套接字
int?fd?=?socket(AF_INET,?SOCK_STREAM,?0);
if(fd?==?-1)
{
perror("socket");
exit(0);
}
//?2.?綁定
struct?sockaddr_in?addr;
addr.sin_family?=?AF_INET;??????????//?ipv4
addr.sin_port?=?htons(8989);????????//?字節(jié)序應(yīng)該是網(wǎng)絡(luò)字節(jié)序
addr.sin_addr.s_addr?=??INADDR_ANY;?//?==?0,?獲取IP的操作交給了內(nèi)核
int?ret?=?bind(fd,?(struct?sockaddr*)&addr,?sizeof(addr));
if(ret?==?-1)
{
perror("bind");
exit(0);
}
//?3.設(shè)置監(jiān)聽
ret?=?listen(fd,?100);
if(ret?==?-1)
{
perror("listen");
exit(0);
}
//?4.?等待,?接受連接請求
int?len?=?sizeof(struct?sockaddr);
//?數(shù)據(jù)初始化
int?max?=?sizeof(infos)?/?sizeof(infos[0]);
for(int?i=0;?i { bzero(&infos[i],?sizeof(infos[i])); infos[i].fd?=?-1; infos[i].tid?=?-1; } //?父進(jìn)程監(jiān)聽,?子進(jìn)程通信 while(1) { //?創(chuàng)建子線程 struct?SockInfo*?pinfo; for(int?i=0;?i { if(infos[i].fd?==?-1) { pinfo?=?&infos[i]; break; } if(i?==?max-1) { sleep(1); i--; } } int?connfd?=?accept(fd,?(struct?sockaddr*)&pinfo->addr,?&len); printf("parent?thread,?connfd:?%d\n",?connfd); if(connfd?==?-1) { perror("accept"); exit(0); } pinfo->fd?=?connfd; pthread_create(&pinfo->tid,?NULL,?working,?pinfo); pthread_detach(pinfo->tid); } //?釋放資源 close(fd);??//?監(jiān)聽 return?0; } 在編寫多線程版并發(fā)服務(wù)器代碼的時候,需要注意父子線程共用同一個地址空間中的文件描述符,因此每當(dāng)在主線程中建立一個新的連接,都需要將得到文件描述符值保存起來,不能在同一變量上進(jìn)行覆蓋,這樣做丟失了之前的文件描述符值也就不知道怎么和客戶端通信了。 在上面示例代碼中是將成功建立連接之后得到的用于通信的文件描述符值保存到了一個全局?jǐn)?shù)組中,每個子線程需要和不同的客戶端通信,需要的文件描述符值也就不一樣,只要保證存儲每個有效文件描述符值的變量對應(yīng)不同的內(nèi)存地址,在使用的時候就不會發(fā)生數(shù)據(jù)覆蓋的現(xiàn)象,造成通信數(shù)據(jù)的混亂了。 文章鏈接:https://subingwen.com/linux/concurrence/ C++ 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。