java nio 基礎(chǔ)用法
傳統(tǒng)的io模型問題:
在傳統(tǒng)的IO模型中,每個(gè)連接創(chuàng)建成功之后都需要一個(gè)線程來維護(hù),每個(gè)線程包含一個(gè)while死循環(huán),那么1w個(gè)連接對(duì)應(yīng)1w個(gè)線程,繼而1w個(gè)while死循環(huán),這就帶來如下幾個(gè)問題:
線程資源受限:線程是操作系統(tǒng)中非常寶貴的資源,同一時(shí)刻有大量的線程處于阻塞狀態(tài)是非常嚴(yán)重的資源浪費(fèi),操作系統(tǒng)耗不起
線程切換效率低下:?jiǎn)螜C(jī)cpu核數(shù)固定,線程爆炸之后操作系統(tǒng)頻繁進(jìn)行線程切換,應(yīng)用性能急劇下降。
除了以上兩個(gè)問題,IO編程中,我們看到數(shù)據(jù)讀寫是以字節(jié)流為單位,效率不高。
NIO編程模型
NIO編程模型中,新來一個(gè)連接不再創(chuàng)建一個(gè)新的線程,而是可以把這條連接直接綁定到某個(gè)固定的線程,然后這條連接所有的讀寫都由這個(gè)線程來負(fù)責(zé),那么他是怎么做到的?我們用一幅圖來對(duì)比一下IO與NIO
如上圖所示,IO模型中,一個(gè)連接來了,會(huì)創(chuàng)建一個(gè)線程,對(duì)應(yīng)一個(gè)while死循環(huán),死循環(huán)的目的就是不斷監(jiān)測(cè)這條連接上是否有數(shù)據(jù)可以讀,大多數(shù)情況下,1w個(gè)連接里面同一時(shí)刻只有少量的連接有數(shù)據(jù)可讀,因此,很多個(gè)while死循環(huán)都白白浪費(fèi)掉了,因?yàn)樽x不出啥數(shù)據(jù)。
而在NIO模型中,他把這么多while死循環(huán)變成一個(gè)死循環(huán),這個(gè)死循環(huán)由一個(gè)線程控制,那么他又是如何做到一個(gè)線程,一個(gè)while死循環(huán)就能監(jiān)測(cè)1w個(gè)連接是否有數(shù)據(jù)可讀的呢?
這就是NIO模型中selector的作用,一條連接來了之后,現(xiàn)在不創(chuàng)建一個(gè)while死循環(huán)去監(jiān)聽是否有數(shù)據(jù)可讀了,而是直接把這條連接注冊(cè)到selector上,然后,通過檢查這個(gè)selector,就可以批量監(jiān)測(cè)出有數(shù)據(jù)可讀的連接,進(jìn)而讀取數(shù)據(jù),下面我再舉個(gè)非常簡(jiǎn)單的生活中的例子說明IO與NIO的區(qū)別。
在一家幼兒園里,小朋友有上廁所的需求,小朋友都太小以至于你要問他要不要上廁所,他才會(huì)告訴你。幼兒園一共有100個(gè)小朋友,有兩種方案可以解決小朋友上廁所的問題:
每個(gè)小朋友配一個(gè)老師。每個(gè)老師隔段時(shí)間詢問小朋友是否要上廁所,如果要上,就領(lǐng)他去廁所,100個(gè)小朋友就需要100個(gè)老師來詢問,并且每個(gè)小朋友上廁所的時(shí)候都需要一個(gè)老師領(lǐng)著他去上,這就是IO模型,一個(gè)連接對(duì)應(yīng)一個(gè)線程。
所有的小朋友都配同一個(gè)老師。這個(gè)老師隔段時(shí)間詢問所有的小朋友是否有人要上廁所,然后每一時(shí)刻把所有要上廁所的小朋友批量領(lǐng)到廁所,這就是NIO模型,所有小朋友都注冊(cè)到同一個(gè)老師,對(duì)應(yīng)的就是所有的連接都注冊(cè)到一個(gè)線程,然后批量輪詢。
這就是NIO模型解決線程資源受限的方案,實(shí)際開發(fā)過程中,我們會(huì)開多個(gè)線程,每個(gè)線程都管理著一批連接,相對(duì)于IO模型中一個(gè)線程管理一條連接,消耗的線程資源大幅減少
由于NIO模型中線程數(shù)量大大降低,線程切換效率因此也大幅度提高
NIO解決這個(gè)問題的方式是數(shù)據(jù)讀寫不再以字節(jié)為單位,而是以字節(jié)塊為單位。IO模型中,每次都是從操作系統(tǒng)底層一個(gè)字節(jié)一個(gè)字節(jié)地讀取數(shù)據(jù),而NIO維護(hù)一個(gè)緩沖區(qū),每次可以從這個(gè)緩沖區(qū)里面讀取一塊的數(shù)據(jù),
這就好比一盤美味的豆子放在你面前,你用筷子一個(gè)個(gè)夾(每次一個(gè)),肯定不如要勺子挖著吃(每次一批)效率來得高。
一、Buffer demo
public class BufferTest {
public static void main(String[] args) {
//靜態(tài)方法常見 buffer
IntBuffer buf = IntBuffer.allocate(10);
int[] array = new int[]{3, 5, 1};
//put一個(gè)數(shù)組到buffer中,使用put方式將
// buf.put(array);
//使用wrap方式會(huì)直接更改原數(shù)組
buf = buf.wrap(array);
//IntBuffer.wrap(array, 0, 2);
buf.put(0, 7);
int length = buf.limit();
for (int i = 0; i < length; i++) {
System.out.print(buf.get(i));
}
for (int i = 0; i < array.length; i++) {
System.out.print(array[i]);
}
System.out.println(buf);
/**
* limit = position;
* position = 0;
*/
buf.flip();
/**
* position = 0;
* limit = capacity;
*/
buf.clear();
System.out.println(buf);
//創(chuàng)建一個(gè)新的字節(jié)緩沖區(qū),共享此緩沖區(qū)的內(nèi)容
IntBuffer newBuffer = buf.duplicate();
System.out.println(newBuffer);
}
}
二、FileChannel demo
public class FileChannelTest {
public static void testFileChannel() throws IOException {
RandomAccessFile aFile = new RandomAccessFile("D:/nio-data.txt", "rw");
FileChannel channel = aFile.getChannel();
//分配一個(gè)新的緩沖區(qū)
ByteBuffer allocate = ByteBuffer.allocate(48);
int bytesRead = channel.read(allocate);
while (bytesRead != -1) {
System.out.println("Read " + bytesRead);
allocate.flip();
while (allocate.hasRemaining()) {
System.out.print((char) allocate.get());
}
allocate.clear();
bytesRead = channel.read(allocate);
}
aFile.close();
}
public static void fileChannelDemo() throws IOException {
//定義一個(gè)byteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
FileChannel inputChannel = new FileInputStream("D:/nio-data.txt").getChannel();
FileChannel outputChannel = new FileOutputStream("D:/nio-data.txt", true).getChannel();
//讀取數(shù)據(jù)
byteBuffer.clear();
int len = inputChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), "UTF-8"));
System.out.println(new String(byteBuffer.array(), 0, len, "UTF-8"));
ByteBuffer byteBuffer2 = ByteBuffer.wrap("奧會(huì)計(jì)師八度空間".getBytes());
outputChannel.write(byteBuffer2);
outputChannel.close();
inputChannel.close();
}
public static void main(String[] args) {
try {
FileChannelTest.fileChannelDemo();
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、不使用 選擇器 selector 的 ServerSocketChannel 和 SocketChannel? 的demo
服務(wù)端:
public class NioChannelServer {
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//獲取一個(gè)intBuffer視圖,操作視圖的同時(shí)原緩沖區(qū)也會(huì)改變
private IntBuffer intBuffer = byteBuffer.asIntBuffer();
private SocketChannel socketChannel = null;
private ServerSocketChannel serverSocketChannel = null;
/**
* 打開服務(wù)端的通道
*
* @throws Exception
*/
public void openChannel() throws Exception {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
System.out.println("服務(wù)端通道已經(jīng)打開");
}
/**
* 等待新的連接
*
* @throws Exception
*/
public void waitReqConn() throws Exception {
while (true) {
socketChannel = serverSocketChannel.accept();
if (null != socketChannel) {
System.out.println("新的連接加入!");
}
//處理請(qǐng)求
processReq();
socketChannel.close();
}
}
private void processReq() throws IOException {
System.out.println("開始讀取和處理客戶端數(shù)據(jù)。。");
byteBuffer.clear();
socketChannel.read(byteBuffer);
int result = intBuffer.get(0) + intBuffer.get(1);
byteBuffer.flip();
byteBuffer.clear();
//修改視圖,byteBuffer也會(huì)變化
intBuffer.put(0, result);
socketChannel.write(byteBuffer);
System.out.println("讀取處理完成");
}
public void start() {
try {
//打開通道
openChannel();
//等待客戶端連接
waitReqConn();
socketChannel.close();
System.out.println("服務(wù)端處理完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
new NioChannelServer().start();
}
}
客戶端:
public class NioChannelClient {
private SocketChannel socketChannel = null;
private ByteBuffer buff = ByteBuffer.allocate(8);
private IntBuffer intBuffer = buff.asIntBuffer();
public SocketChannel connect() throws IOException {
return SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
}
public int getSum(int a, int b) {
int result = 0;
try {
socketChannel = connect();
sendRequest(a, b);
result = receiveResult();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
private int receiveResult() throws IOException {
buff.clear();
socketChannel.read(buff);
return intBuffer.get(0);
}
private void sendRequest(int a, int b) throws IOException {
buff.clear();
intBuffer.put(0,a);
intBuffer.put(1,b);
socketChannel.write(buff);
System.out.println("客戶端發(fā)送請(qǐng)求 ("+a+"+"+b+")");
}
public static void main(String[] args){
Random random = new Random();
for (int i = 0; i <10 ; i++) {
int result = new NioChannelClient().getSum(random.nextInt(100),random.nextInt(100));
System.out.println(result);
}
}
}
四、使用 selector 方式 實(shí)現(xiàn) ServerSocketChannel? 和 SocketChannel
選擇器(Selector) 是 SelectableChannle 對(duì)象的多路復(fù)用器,Selector 可以同時(shí)監(jiān)控多個(gè) SelectableChannel 的 IO 狀況,也就是說,利用 Selector可使一個(gè)單獨(dú)的線程管理多個(gè) Channel,selector 是非阻塞 IO 的核心。
選擇器(Selector)的應(yīng)用:
當(dāng)通道使用register(Selector sel, int ops)方法將通道注冊(cè)選擇器時(shí),選擇器對(duì)通道事件進(jìn)行監(jiān)聽,通過第二個(gè)參數(shù)指定監(jiān)聽的事件類型。
其中可監(jiān)聽的事件類型包括以下:
讀 : SelectionKey.OP_READ (1)
寫 : SelectionKey.OP_WRITE (4)
連接 : SelectionKey.OP_CONNECT (8)
接收 : SelectionKey.OP_ACCEPT (16)
如果需要監(jiān)聽多個(gè)事件是:
int key =?SelectionKey.OP_READ |?SelectionKey.OP_WRITE ; //表示同時(shí)監(jiān)聽讀寫操作
服務(wù)端:
public class SelectorServer {
private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
private int keys = 0;
public void initServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8888));
serverSocketChannel.configureBlocking(false);
//服務(wù)端通道注冊(cè)accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
private void listen() throws IOException {
System.out.println("服務(wù)端已經(jīng)啟動(dòng)");
while (true) {
//讓通道選擇器至少選擇一個(gè)通道,阻塞的方法
keys = this.selector.select();
//selector.wakeup();//可以喚醒阻塞的select()方法
//設(shè)置超時(shí)時(shí)間,非阻塞
//this.selector.select(1000);
System.out.println(keys);
Iterator
if (keys > 0) {
//進(jìn)行輪詢
while (itor.hasNext()) {
try{
SelectionKey key = itor.next();
if (key.isAcceptable()) {
//serverSocketChannel = (ServerSocketChannel) key.channel();
//獲取和客戶端連接的服務(wù)端渠道
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap("hello".getBytes()));
//還需要讀取客戶端發(fā)過來的數(shù)據(jù),所以需要注冊(cè)一個(gè)讀取數(shù)據(jù)的事件
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
read(key);
}
}finally {
//處理完一個(gè)key,就刪除,防止重復(fù)處理
itor.remove();
}
}
} else {
System.out.println("select finished without any keys");
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
String msg = new String(byteBuffer.array(), 0, len);
System.out.println("服務(wù)端接收到的消息是" + msg);
}
public void start() {
try {
initServer();
listen();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new SelectorServer().start();
}
}
客戶端:
public class SelectorClient {
private Selector selector;
private ByteBuffer outBuffer = ByteBuffer.allocate(1024);
private ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
private int keys = 0;
private SocketChannel socketChannel = null;
public void initClient() throws IOException {
selector = Selector.open();
socketChannel = SocketChannel.open();
//客戶端通道配置為非阻塞
socketChannel.configureBlocking(false);
//連接服務(wù)端
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
//注冊(cè)客戶端連接服務(wù)器的事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void listen() throws IOException {
while (true) {
keys = this.selector.select();
System.out.println(keys);
if (keys > 0) {
Iterator
while (iter.hasNext()) {
try{
SelectionKey key = iter.next();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
System.out.println("完成連接");
}
//連接完成之后,肯定還要做其它的事情,比如寫
channel.register(selector, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
SocketChannel channel = (SocketChannel) key.channel();
outBuffer.clear();
System.out.println("客戶端正在寫數(shù)據(jù)。。");
//從控制臺(tái)寫消息
Scanner scanner = new Scanner(System.in);
while (true) {
String msg = scanner.next();
channel.write(ByteBuffer.wrap(msg.getBytes()));
if("end".equals(msg)) {
break;
}
}
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客戶端寫數(shù)據(jù)完成。。。");
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
inputBuffer.clear();
int len = socketChannel.read(inputBuffer);
System.out.println("讀取服務(wù)端發(fā)送的消息:" + new String(inputBuffer.array()));
}
}finally{
iter.remove();
}
}
} else {
System.out.println("select finished without any keys");
}
}
}
public void start() {
try {
initClient();
listen();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
new SelectorClient().start();
}
}
nio的非阻塞是對(duì)于網(wǎng)絡(luò)通道來說的,需要使用Channel.configureBlocking(false)來設(shè)置通道為非阻塞的,如果沒設(shè)置,默認(rèn)是阻塞的。
Java 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。