NettyJavaNIO編程模型介紹02

      網(wǎng)友投稿 685 2022-05-30

      因為篇幅問題我們繼續(xù)上一篇的內(nèi)容繼續(xù)。

      一、NIO網(wǎng)絡(luò)編程原理分析

      NIO 非阻塞 網(wǎng)絡(luò)編程相關(guān)的(Selector、SelectionKey、ServerScoketChannel和SocketChannel) 關(guān)系梳理圖

      對上圖的說明:

      當(dāng)客戶端連接時,會通過ServerSocketChannel 得到 SocketChannel

      Selector 進行監(jiān)聽 select 方法, 返回有事件發(fā)生的通道的個數(shù).

      將socketChannel注冊到Selector上, register(Selector sel, int ops), 一個selector上可以注冊多個SocketChannel

      注冊后返回一個 SelectionKey, 會和該Selector 關(guān)聯(lián)(集合)

      進一步得到各個 SelectionKey (有事件發(fā)生)

      在通過 SelectionKey 反向獲取 SocketChannel , 方法 channel()

      可以通過 得到的 channel , 完成業(yè)務(wù)處理

      二、NIO網(wǎng)絡(luò)編程快速入門

      接下來我們通過具體的案例代碼來實現(xiàn)網(wǎng)絡(luò)通信

      服務(wù)端:

      package com.dpb.netty.nio; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @program: netty4demo * @description: Nio的服務(wù)端 * @author: 波波烤鴨 * @create: 2019-12-28 14:17 */ public class NioServer { public static void main(String[] args) throws Exception{ //創(chuàng)建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一個Selecor對象 Selector selector = Selector.open(); //綁定一個端口6666, 在服務(wù)器端監(jiān)聽 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //設(shè)置為非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注冊到 selector 關(guān)心 事件為 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("注冊后的selectionkey 數(shù)量=" + selector.keys().size()); // 1 //循環(huán)等待客戶端連接 while (true) { //這里我們等待1秒,如果沒有事件發(fā)生, 返回 if(selector.select(1000) == 0) { //沒有事件發(fā)生 System.out.println("服務(wù)器等待了1秒,無連接"); continue; } //如果返回的>0, 就獲取到相關(guān)的 selectionKey集合 //1.如果返回的>0, 表示已經(jīng)獲取到關(guān)注的事件 //2. selector.selectedKeys() 返回關(guān)注事件的集合 // 通過 selectionKeys 反向獲取通道 Set selectionKeys = selector.selectedKeys(); System.out.println("selectionKeys 數(shù)量 = " + selectionKeys.size()); //遍歷 Set, 使用迭代器遍歷 Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //獲取到SelectionKey SelectionKey key = keyIterator.next(); //根據(jù)key 對應(yīng)的通道發(fā)生的事件做相應(yīng)處理 if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客戶端連接 //該該客戶端生成一個 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客戶端連接成功 生成了一個 socketChannel " + socketChannel.hashCode()); //將 SocketChannel 設(shè)置為非阻塞 socketChannel.configureBlocking(false); //將socketChannel 注冊到selector, 關(guān)注事件為 OP_READ, 同時給socketChannel //關(guān)聯(lián)一個Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客戶端連接后 ,注冊的selectionkey 數(shù)量=" + selector.keys().size()); //2,3,4.. } if(key.isReadable()) { //發(fā)生 OP_READ //通過key 反向獲取到對應(yīng)channel SocketChannel channel = (SocketChannel)key.channel(); //獲取到該channel關(guān)聯(lián)的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); channel.read(buffer); System.out.println("form 客戶端 " + new String(buffer.array(), CharsetUtil.UTF_8)); } //手動從集合中移動當(dāng)前的selectionKey, 防止重復(fù)操作 keyIterator.remove(); } } } }

      客戶端代碼

      package com.dpb.netty.nio; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @program: netty4demo * @description: NIO的客戶端 * @author: 波波烤鴨 * @create: 2019-12-28 14:18 */ public class NioClient { public static void main(String[] args) throws Exception{ //得到一個網(wǎng)絡(luò)通道 SocketChannel socketChannel = SocketChannel.open(); //設(shè)置非阻塞 socketChannel.configureBlocking(false); //提供服務(wù)器端的ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //連接服務(wù)器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因為連接需要時間,客戶端不會阻塞,可以做其它工作.."); } } //...如果連接成功,就發(fā)送數(shù)據(jù) String str = "hello, bobo烤鴨~"; //Wraps a byte array into a buffer ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(CharsetUtil.UTF_8)); //發(fā)送數(shù)據(jù),將 buffer 數(shù)據(jù)寫入 channel socketChannel.write(buffer); System.in.read(); } }

      效果

      三、SelectionKey

      SelectionKey,表示 Selector 和網(wǎng)絡(luò)通道的注冊關(guān)系, 共四種

      int OP_ACCEPT:有新的網(wǎng)絡(luò)連接可以 accept,值為 16

      int OP_CONNECT:代表連接已經(jīng)建立,值為 8

      int OP_READ:代表讀操作,值為 1

      int OP_WRITE:代表寫操作,值為 4

      源碼中:

      public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;

      SelectionKey相關(guān)方法

      public abstract class SelectionKey { public abstract Selector selector();//得到與之關(guān)聯(lián)的 Selector 對象 public abstract SelectableChannel channel();//得到與之關(guān)聯(lián)的通道 public final Object attachment();//得到與之關(guān)聯(lián)的共享數(shù)據(jù) public abstract SelectionKey interestOps(int ops);//設(shè)置或改變監(jiān)聽事件 public final boolean isAcceptable();//是否可以 accept public final boolean isReadable();//是否可以讀 public final boolean isWritable();//是否可以寫 }

      四、ServerSocketChannel

      Netty之JavaNIO編程模型介紹02

      ServerSocketChannel 在服務(wù)器端監(jiān)聽新的客戶端 Socket 連接

      相關(guān)方法如下

      public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{ //得到一個 ServerSocketChannel 通道 public static ServerSocketChannel open(); //設(shè)置服務(wù)器端端口號 public final ServerSocketChannel bind(SocketAddress local); //設(shè)置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式 public final SelectableChannel configureBlocking(boolean block); //接受一個連接,返回代表這個連接的通道對象 public SocketChannel accept(); //注冊一個選擇器并設(shè)置監(jiān)聽事件 public final SelectionKey register(Selector sel, int ops); }

      五、SocketChannel

      SocketChannel,網(wǎng)絡(luò) IO 通道,具體負責(zé)進行讀寫操作。NIO 把緩沖區(qū)的數(shù)據(jù)寫入通道,或者把通道里的數(shù)據(jù)讀到緩沖區(qū)。

      相關(guān)方法如下

      public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{ //得到一個 SocketChannel 通道 public static SocketChannel open(); //設(shè)置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式 public final SelectableChannel configureBlocking(boolean block); //連接服務(wù)器 public boolean connect(SocketAddress remote); //如果上面的方法連接失敗,接下來就要通過該方法完成連接操作 public boolean finishConnect(); public int write(ByteBuffer src);//往通道里寫數(shù)據(jù) public int read(ByteBuffer dst);//從通道里讀數(shù)據(jù) //注冊一個選擇器并設(shè)置監(jiān)聽事件,最后一個參數(shù)可以設(shè)置共享數(shù)據(jù) public final SelectionKey register(Selector sel, int ops, Object att); public final void close();//關(guān)閉通道 }

      六、群聊系統(tǒng)

      接下來提供一個群聊系統(tǒng)的案例的簡單代碼。

      編寫一個 NIO 群聊系統(tǒng),實現(xiàn)服務(wù)器端和客戶端之間的數(shù)據(jù)簡單通訊(非阻塞)

      實現(xiàn)多人群聊

      服務(wù)器端:可以監(jiān)測用戶上線,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)功能

      客戶端:通過channel 可以無阻塞發(fā)送消息給其它所有用戶,同時可以接受其它用戶發(fā)送的消息(有服務(wù)器轉(zhuǎn)發(fā)得到)

      目的:進一步理解NIO非阻塞網(wǎng)絡(luò)編程機制

      服務(wù)端代碼

      package com.dpb.netty.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class GroupChatServer { //定義屬性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //構(gòu)造器 //初始化工作 public GroupChatServer() { try { //得到選擇器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //綁定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //設(shè)置非阻塞模式 listenChannel.configureBlocking(false); //將該listenChannel 注冊到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } //監(jiān)聽 public void listen() { System.out.println("監(jiān)聽線程: " + Thread.currentThread().getName()); try { //循環(huán)處理 while (true) { int count = selector.select(); if(count > 0) {//有事件處理 //遍歷得到selectionKey 集合 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出selectionkey SelectionKey key = iterator.next(); //監(jiān)聽到accept if(key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //將該 sc 注冊到seletor sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + " 上線 "); } if(key.isReadable()) { //通道發(fā)送read事件,即通道是可讀的狀態(tài) //處理讀 (專門寫方法..) readData(key); } //當(dāng)前的key 刪除,防止重復(fù)處理 iterator.remove(); } } else { System.out.println("等待...."); } } }catch (Exception e) { e.printStackTrace(); }finally { //發(fā)生異常處理.... } } //讀取客戶端消息 private void readData(SelectionKey key) { //取到關(guān)聯(lián)的channle SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //創(chuàng)建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根據(jù)count的值做處理 if(count > 0) { //把緩存區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串 String msg = new String(buffer.array()); //輸出該消息 System.out.println("form 客戶端: " + msg); //向其它的客戶端轉(zhuǎn)發(fā)消息(去掉自己), 專門寫一個方法來處理 sendInfoToOtherClients(msg, channel); } }catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + " 離線了.."); //取消注冊 key.cancel(); //關(guān)閉通道 channel.close(); }catch (IOException e2) { e2.printStackTrace();; } } } //轉(zhuǎn)發(fā)消息給其它客戶(通道) private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{ System.out.println("服務(wù)器轉(zhuǎn)發(fā)消息中..."); System.out.println("服務(wù)器轉(zhuǎn)發(fā)數(shù)據(jù)給客戶端線程: " + Thread.currentThread().getName()); //遍歷 所有注冊到selector 上的 SocketChannel,并排除 self for(SelectionKey key: selector.keys()) { //通過 key 取出對應(yīng)的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self) { //轉(zhuǎn)型 SocketChannel dest = (SocketChannel)targetChannel; //將msg 存儲到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //將buffer 的數(shù)據(jù)寫入 通道 dest.write(buffer); } } } public static void main(String[] args) { //創(chuàng)建服務(wù)器對象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } } //可以寫一個Handler class MyHandler { public void readData() { } public void sendInfoToOtherClients(){ } }

      客戶端代碼

      package com.dpb.netty.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class GroupChatClient { //定義相關(guān)的屬性 private final String HOST = "127.0.0.1"; // 服務(wù)器的ip private final int PORT = 6667; //服務(wù)器端口 private Selector selector; private SocketChannel socketChannel; private String username; //構(gòu)造器, 完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //連接服務(wù)器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //設(shè)置非阻塞 socketChannel.configureBlocking(false); //將channel 注冊到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); } //向服務(wù)器發(fā)送消息 public void sendInfo(String info) { info = username + " 說:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); }catch (IOException e) { e.printStackTrace(); } } //讀取從服務(wù)器端回復(fù)的消息 public void readInfo() { try { int readChannels = selector.select(); if(readChannels > 0) {//有可以用的通道 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isReadable()) { //得到相關(guān)的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一個Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取 sc.read(buffer); //把讀到的緩沖區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //刪除當(dāng)前的selectionKey, 防止重復(fù)操作 } else { //System.out.println("沒有可以用的通道..."); } }catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //啟動我們客戶端 GroupChatClient chatClient = new GroupChatClient(); //啟動一個線程, 每個3秒,讀取從服務(wù)器發(fā)送數(shù)據(jù) new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //發(fā)送數(shù)據(jù)給服務(wù)器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }

      效果

      七、NIO與零拷貝

      零拷貝基本介紹

      零拷貝是網(wǎng)絡(luò)編程的關(guān)鍵,很多性能優(yōu)化都離不開。

      在 Java 程序中,常用的零拷貝有 mmap(內(nèi)存映射) 和 sendFile。那么,他們在 OS 里,到底是怎么樣的一個的設(shè)計?我們分析 mmap 和 sendFile 這兩個零拷貝

      另外我們看下NIO 中如何使用零拷貝

      傳統(tǒng)IO數(shù)據(jù)讀寫

      Java 傳統(tǒng) IO 和 網(wǎng)絡(luò)編程的一段代碼

      File file = new File("test.txt"); RandomAccessFile raf = new RandomAccessFile(file, "rw"); byte[] arr = new byte[(int) file.length()]; raf.read(arr); Socket socket = new ServerSocket(8080).accept(); socket.getOutputStream().write(arr);

      DMA: direct

      memory access

      直接內(nèi)存拷貝(不使用CPU)

      mmap 優(yōu)化

      mmap 通過內(nèi)存映射,將文件映射到內(nèi)核緩沖區(qū),同時,用戶空間可以共享內(nèi)核空間的數(shù)據(jù)。這樣,在進行網(wǎng)絡(luò)傳輸時,就可以減少內(nèi)核空間到用戶控件的拷貝次數(shù)。如下圖

      mmap示意圖

      sendFile 優(yōu)化

      Linux 2.1 版本 提供了 sendFile 函數(shù),其基本原理如下:數(shù)據(jù)根本不經(jīng)過用戶態(tài),直接從內(nèi)核緩沖區(qū)進入到 Socket Buffer,同時,由于和用戶態(tài)完全無關(guān),就減少了一次上下文切換

      示意圖和小結(jié)

      提示:零拷貝從操作系統(tǒng)角度,是沒有cpu 拷貝

      Linux 在 2.4 版本中,做了一些修改,避免了從內(nèi)核緩沖區(qū)拷貝到 Socket buffer 的操作,直接拷貝到協(xié)議棧,從而再一次減少了數(shù)據(jù)拷貝。具體如下圖和小結(jié):

      這里其實有 一次cpu 拷貝

      kernel buffer -> socket buffer

      但是,拷貝的信息很少,比如

      lenght , offset , 消耗低,可以忽略

      零拷貝的再次理解

      我們說零拷貝,是從操作系統(tǒng)的角度來說的。因為內(nèi)核緩沖區(qū)之間,沒有數(shù)據(jù)是重復(fù)的(只有 kernel buffer 有一份數(shù)據(jù))。

      零拷貝不僅僅帶來更少的數(shù)據(jù)復(fù)制,還能帶來其他的性能優(yōu)勢,例如更少的上下文切換,更少的 CPU 緩存?zhèn)喂蚕硪约盁o CPU 校驗和計算。

      mmap 和 sendFile 的區(qū)別

      mmap 適合小數(shù)據(jù)量讀寫,sendFile 適合大文件傳輸。

      mmap 需要 4 次上下文切換,3 次數(shù)據(jù)拷貝;sendFile 需要 3 次上下文切換,最少 2 次數(shù)據(jù)拷貝。

      sendFile 可以利用 DMA 方式,減少 CPU 拷貝,mmap 則不能(必須從內(nèi)核拷貝到 Socket 緩沖區(qū))。

      NIO 零拷貝案例

      NewIOServer

      package com.dpb.netty.nio.zerocopy; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; //服務(wù)器 public class NewIOServer { public static void main(String[] args) throws Exception { InetSocketAddress address = new InetSocketAddress(7001); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(address); //創(chuàng)建buffer ByteBuffer byteBuffer = ByteBuffer.allocate(4096); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); int readcount = 0; while (-1 != readcount) { try { readcount = socketChannel.read(byteBuffer); }catch (Exception ex) { // ex.printStackTrace(); break; } // byteBuffer.rewind(); //倒帶 position = 0 mark 作廢 } } } }

      NewIOClient

      package com.dpb.netty.nio.zerocopy; import java.io.FileInputStream; import java.net.InetSocketAddress; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; public class NewIOClient { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost", 7001)); String filename = "protoc-3.6.1-win32.zip"; //得到一個文件channel FileChannel fileChannel = new FileInputStream(filename).getChannel(); //準備發(fā)送 long startTime = System.currentTimeMillis(); //在linux下一個transferTo 方法就可以完成傳輸 //在windows 下 一次調(diào)用 transferTo 只能發(fā)送8m , 就需要分段傳輸文件, 而且要主要 //傳輸時的位置 =》 課后思考... //transferTo 底層使用到零拷貝 long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel); System.out.println("發(fā)送的總的字節(jié)數(shù) =" + transferCount + " 耗時:" + (System.currentTimeMillis() - startTime)); //關(guān)閉 fileChannel.close(); } }

      好了本文就介紹到此~

      Java 網(wǎng)絡(luò)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:初識"跨域請求"和"同源策略"
      下一篇:【云速建站】頁面產(chǎn)品維護簡述
      相關(guān)文章
      亚洲成年轻人电影网站www| 亚洲一卡2卡3卡4卡5卡6卡| 精品久久久久久亚洲综合网| 亚洲字幕AV一区二区三区四区| 亚洲经典在线观看| 亚洲黄色一级毛片| 中文字幕亚洲综合久久2| 精品亚洲成a人片在线观看| 久久青青草原亚洲av无码app| 亚洲成人中文字幕| 久久精品国产亚洲AV香蕉| 亚洲美女视频网站| 亚洲国产精品综合久久2007| 亚洲av片不卡无码久久| 亚洲三级在线观看| 亚洲欧美熟妇综合久久久久| 亚洲av中文无码乱人伦在线观看| 亚洲av无码一区二区三区在线播放| 无码天堂亚洲国产AV| 亚洲熟妇少妇任你躁在线观看无码 | 亚洲国产精品不卡在线电影| 久久久久久亚洲AV无码专区| 亚洲狠狠ady亚洲精品大秀| 亚洲伊人久久大香线蕉在观| 亚洲视频无码高清在线| 九九精品国产亚洲AV日韩| 亚洲成a人片在线观看久| 亚洲高清免费视频| 最新国产AV无码专区亚洲| 精品久久香蕉国产线看观看亚洲| 亚洲AV日韩精品久久久久| 亚洲第一成年网站大全亚洲| 亚洲偷自精品三十六区| 亚洲欧洲无卡二区视頻| 亚洲精品黄色视频在线观看免费资源 | 亚洲综合在线成人一区| 日韩亚洲人成在线| 亚洲国产成人久久综合一区77 | 久久久久亚洲精品中文字幕| 久久久久亚洲av无码专区蜜芽| 亚洲视频一区二区三区|