更新時(shí)間:2022-11-18 來(lái)源:黑馬程序員 瀏覽量:
NIO概述
NIO介紹
傳統(tǒng)IO流(java.io):讀寫(xiě)操作結(jié)束前,處于線性阻塞,代碼簡(jiǎn)單,安全,性能低
NIO:支持非阻塞式編程,性能更有優(yōu)勢(shì),但代碼編寫(xiě)較為復(fù)雜。
概念理解
同步(synchronous):一條線程執(zhí)行期間,其他線程就只能等待。
異步(asynchronous):一條線程在執(zhí)行期間,其他線程無(wú)需等待。
阻塞(blocking):當(dāng)前任務(wù)未執(zhí)行結(jié)束,會(huì)阻礙后續(xù)任務(wù)的執(zhí)行。
非阻塞(non-blocking):當(dāng)前任務(wù)未執(zhí)行結(jié)束,不會(huì)阻礙后續(xù)任務(wù)的執(zhí)行。
IO流與NIO的區(qū)別
NIO是面向緩沖區(qū),IO 面向流。
NIO是非阻塞的,IO是阻塞的。
NIO可以使用選擇器,IO不涉及選擇器。
NIO組成
Buffer(緩沖區(qū),負(fù)責(zé)讀寫(xiě)數(shù)據(jù),類似火車)
Channel(通道 ,負(fù)責(zé)傳輸,類似鐵軌)
Selector(選擇器,負(fù)責(zé)調(diào)度通道,類似指揮中心)
Buffer
介紹
理解:實(shí)質(zhì)相當(dāng)于普通IO流中的數(shù)組,負(fù)責(zé)數(shù)據(jù)的存和取。但是它提供了對(duì)數(shù)據(jù)的結(jié)構(gòu)化訪問(wèn),可以跟蹤系統(tǒng)的讀、寫(xiě)進(jìn)程。
常見(jiàn)分類:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。
核心屬性
capacity:代表緩沖區(qū)的最大容量。
limit:代表剩余(可存入/可讀取)數(shù)量
position:代表(存入/讀取)位置
mark:標(biāo)記當(dāng)前position的位置。
四個(gè)屬性關(guān)系:mark <= position <= limit <= capacity
構(gòu)造方法(以ByteBuffer為例)
static ByteBuffer allocate(int capacity)分配一個(gè)新的字節(jié)緩沖區(qū)。
static ByteBuffer allocateDirect(int capacity) 分配新的直接字節(jié)緩沖區(qū)。
static ByteBuffer wrap(byte[] array)將 byte 數(shù)組包裝到緩沖區(qū)中。
常用方法
獲取屬性值
capacity():獲取緩沖區(qū)的最大容量。
limit():獲取剩余(可存入/可讀取)數(shù)量。
position():獲取(存入/讀取)位置。
mark():標(biāo)記當(dāng)前position的位置。
存取數(shù)據(jù)
put(Xxx[] xxx) 存入數(shù)據(jù)到緩沖區(qū)中,position >= limit不可寫(xiě)。
get() 獲取緩沖區(qū)的position位置數(shù)據(jù),并將position后移,position >= limit不可讀。
核心方法
flip()翻轉(zhuǎn)此緩沖區(qū)(limit=capacity-postion,postion=0),清除標(biāo)記,用于讀取模式。
clear()清除此緩沖區(qū)(limit=capacity,postion=0),清除標(biāo)記,用于寫(xiě)入模式。
rewind() 倒回這個(gè)緩沖區(qū)(position=0),清除標(biāo)記。
reset() 將此緩沖區(qū)的位置重置為先前標(biāo)記的位置(position=mark)。
演示代碼
public class Test01Buffer { public static void main(String[] args) { //新建緩沖區(qū)對(duì)象(默認(rèn)為寫(xiě)入模式) ByteBuffer b = ByteBuffer.allocate(10); //寫(xiě)入數(shù)據(jù) System.out.println("====寫(xiě)入模式屬性狀態(tài)===="); showProperty(b); System.out.println("====讀取模式屬性狀態(tài)===="); //切換為讀取模式 b.flip(); showProperty(b); System.out.println("====寫(xiě)入數(shù)據(jù)===="); b.clear(); b.put(new byte[]{1,2}); showProperty(b); System.out.println("====讀取數(shù)據(jù)===="); b.flip(); System.out.println("position-------->" + b.position() + ",get:" + b.get()); //循環(huán)遍歷通用格式 //while (b.position()<b.limit()){ // System.out.println("position-------->" + b.position() + ",get:" + b.get()); //} System.out.println("====重置操作位置前:記錄位置===="); showProperty(b); //記錄位置 b.mark(); System.out.println("====重置操作位置前:獲取新數(shù)據(jù)===="); System.out.println("position-------->" + b.position() + ",get:" + b.get()); showProperty(b); System.out.println("====重置操作位置后===="); b.reset(); showProperty(b); System.out.println("====倒回緩沖區(qū)前===="); showProperty(b); System.out.println("====倒回緩沖區(qū)后===="); b.rewind(); showProperty(b); } //展示參數(shù) public static void showProperty(ByteBuffer b) { //容量 System.out.println("capacity:" + b.capacity()); //可存放個(gè)數(shù) System.out.println("limit:" + b.limit()); //下一個(gè)存入位置 System.out.println("position:" + b.position()); } }
Channel入門
介紹
理解 Channel理解為通道,包含了寫(xiě)入和讀取的操作,可以理解為IO中的流對(duì)象。Channel負(fù)責(zé)讀寫(xiě),Buffer負(fù)責(zé)存取。
常見(jiàn)分類:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel
Channel與IO流區(qū)別
Channel是雙向的,既可以讀又可以寫(xiě),而IO是單向的。
Channel可以進(jìn)行異步的讀寫(xiě),IO是不支持異步。
Channel的讀寫(xiě)必須通過(guò)buffer對(duì)象,IO通過(guò)流可以直接讀寫(xiě)。
構(gòu)造方法(以FileChannel為例 )
在IO流FileXXX字節(jié)流中提供了getChannel()方法獲取FileChannel對(duì)象。
FileChannel getChannel() 通過(guò)FileXXX字節(jié)流的方法獲取對(duì)象
常用方法
int read(ByteBuffer dst):將數(shù)據(jù)讀取到緩沖區(qū)中
int write(ByteBuffer src):將數(shù)據(jù)從緩沖區(qū)中寫(xiě)出到指定位置
演示代碼
public class Test02FileChannel { public static void main(String[] args) throws IOException { FileChannel in = new FileInputStream("D:\\image.jpg").getChannel(); FileChannel out = new FileOutputStream("D:\\imageCopy.jpg").getChannel(); ByteBuffer b = ByteBuffer.allocate(10); int len = -1; while ((len = in.read(b)) != -1) { b.flip(); out.write(b); b.clear(); } in.close(); out.close(); } }
ChannelTCP協(xié)議編程
介紹
NIO中通過(guò)SocketChannel與ServerSocketChannel替代TCP協(xié)議的網(wǎng)絡(luò)通信編程
客戶端通道操作
SocketChannel 客戶端通道,用于讀寫(xiě)TCP網(wǎng)絡(luò)協(xié)議數(shù)據(jù)
獲取對(duì)象 public static SocketChannelopen()
連接服務(wù)器 boolean connect(SocketAddress remote)
SocketAddress是抽象類,使用其子類InetSocketAddress創(chuàng)建的對(duì)象。InetSocketAddress(String ip,int port)
等待客戶端連接 SocketChannel accept()
服務(wù)端通道操作
ServerSocketChannel 服務(wù)端通道,用于服務(wù)端監(jiān)聽(tīng)TCP連接
獲取對(duì)象 public static ServerSocketChannel open()
綁定端口號(hào) ServerSocketChannel bind(SocketAddress local)
服務(wù)器代碼
public class Test03ServerByChanner { public static void main(String[] args) throws IOException { //獲取服務(wù)器通道對(duì)象 ServerSocketChannel serverSocket = ServerSocketChannel.open(); //綁定端口 ServerSocketChannel socket = serverSocket.bind(new InetSocketAddress(8888)); SocketChannel server = socket.accept(); //接收數(shù)據(jù) System.out.println("服務(wù)端開(kāi)始接收數(shù)據(jù)......"); ByteBuffer buffer = ByteBuffer.allocate(1024); int len = -1; while ((len = server.read(buffer)) != -1) { //翻轉(zhuǎn)緩沖區(qū),讀取數(shù)據(jù) buffer.flip(); System.out.println("server:" + new String(buffer.array())); buffer.clear(); } System.out.println("服務(wù)端開(kāi)始反饋數(shù)據(jù)......"); buffer.put("數(shù)據(jù)收到了".getBytes()); //翻轉(zhuǎn)緩沖區(qū),讀取數(shù)據(jù) buffer.flip(); //取出緩沖區(qū)數(shù)據(jù),寫(xiě)會(huì)給客戶端 server.write(buffer); server.close(); } }
客戶端代碼
public class Test03ClientByChannel { public static void main(String[] args) throws Exception { //獲取連接對(duì)象 SocketChannel client = SocketChannel.open(); //連接服務(wù)器 client.connect(new InetSocketAddress("localhost", 8888)); //發(fā)送數(shù)據(jù) System.out.println("客戶端開(kāi)始發(fā)送數(shù)據(jù)......"); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("服務(wù)器,你好啊".getBytes()); //翻轉(zhuǎn)緩沖區(qū),讀取數(shù)據(jù) buffer.flip(); //從緩沖區(qū)取出數(shù)據(jù)寫(xiě)入通道 client.write(buffer); client.shutdownOutput(); //等待反饋 buffer.clear(); int len = -1; while ((len = client.read(buffer)) != -1) { buffer.flip(); System.out.println("client:" + new String(buffer.array())); buffer.clear(); } //關(guān)閉客戶端 client.close(); } }
多路復(fù)用
介紹
非多路復(fù)用:服務(wù)器端需要為每個(gè)端口的每次請(qǐng)求,開(kāi)辟線程處理業(yè)務(wù),高并發(fā)狀態(tài)下會(huì)造成系統(tǒng)性能下降。
多路復(fù)用:服務(wù)器端利用一個(gè)線程處理多個(gè)端口的訪問(wèn)請(qǐng)求,節(jié)省CPU資源,提高程序運(yùn)行效率,高并發(fā)狀態(tài)下有明顯優(yōu)勢(shì)。
核心知識(shí)
1.通過(guò)Selector中的open方法,獲取選擇器對(duì)象
public static Selector open():獲取Selector對(duì)象
2.通過(guò)Channel中的方法,注冊(cè)通道給選擇器
①創(chuàng)建通道對(duì)象,設(shè)置通道屏蔽模式
void configureBlocking(boolean block)
?、趯⑼ǖ雷?cè)給選擇器,并設(shè)置該通道的關(guān)注事件
SelectionKey register(Selector sel,int ops)
Selector sel 要注冊(cè)的選擇器
ops表示注冊(cè)的事件類型,在 SelectionKey類中提供的四種類型實(shí)現(xiàn)。
SelectionKey.OP_ACCEPT : 接收連接就緒事件,表示服務(wù)器監(jiān)聽(tīng)到了客戶連接,服務(wù)器可以接收這個(gè)連接了
SelectionKey.OP_CONNECT:連接就緒事件,表示客戶端和服務(wù)器的連接已經(jīng)建立成功
SelectionKey.OP_READ: 讀就緒事件,表示通道中有了可讀的數(shù)據(jù),可以執(zhí)行讀操作了
SelectionKey.OP_WRITE: 寫(xiě)就緒事件,表示已經(jīng)可以向通道中寫(xiě)數(shù)據(jù)了
注意事項(xiàng)
被注冊(cè)的Channel必須支持異步模式,否則異步NIO就無(wú)法工作,例如FileChannel(沒(méi)有異步模式)不能被注冊(cè)到Selector。
ServerSocketChannel在注冊(cè)時(shí),只能使用以O(shè)P_ACCEPT狀態(tài)注冊(cè),否則拋出異常。
SocketChannel在注冊(cè)時(shí),不支持OP_ACCEPT狀態(tài)注冊(cè)。
3.通過(guò)Selector中的方法,獲取事件
int select():將事件存放至事件集合,返回已就緒事件個(gè)數(shù)。如果沒(méi)有新的已就緒事件,該方法將持續(xù)阻塞。
Selector的Set selectedKeys():返回選擇器的已就緒事件集。
Set keys():返回選擇器的感興趣事件集(已注冊(cè)的事件數(shù))。
SelectionKey概述
SelectionKey 代表一個(gè)通道在Selector的注冊(cè)事件關(guān)系鍵。
當(dāng)Selector通知某個(gè)傳入事件時(shí),是通過(guò)對(duì)應(yīng) SelectionKey 進(jìn)行傳遞的。
想要取消已注冊(cè)的通道事件,需要通過(guò)SelectionKey的cancel方法完成。
SelectionKey中屬性:
Interest set:興趣集,表示已注冊(cè)的事件集合,下一次調(diào)用方法,將測(cè)試是否有此事件的加入。
通過(guò)SelectionKey的 int interestOps() 方法,可以獲取當(dāng)前 SelectionKey的感興趣事件。
Ready set:準(zhǔn)備集,表示已準(zhǔn)備就緒的事件集合。
通過(guò)SelectionKey的 int readyOps()方法,可以獲取當(dāng)前 SelectionKey的準(zhǔn)備就緒事件。
Channel:事件對(duì)應(yīng)的通道。
通過(guò)SelectionKey的 SelectableChannel channel()方法,可以獲取當(dāng)前 SelectionKey的表示的通道。
Selector:事件綁定的選擇器。
通過(guò)SelectionKey的 Selector selector() 方法,可以獲取當(dāng)前 SelectionKey的綁定的選擇器。
Attached:事件對(duì)象的附加信息。
通過(guò) SelectionKey的 Object attach(Object ob)方法,將給定對(duì)象附加到此鍵。
通過(guò) SelectionKey的 Object attachment()方法,檢索當(dāng)前的附件。
通過(guò) Channel的SelectionKey register(Selector sel,int ops,Object ob)方法,可以附件及獲取附加信
SelectionKey迭代器
4.通過(guò)SelectionKey中的方法,判斷事件
isAcceptable() 是否有準(zhǔn)備好接收新連接
isConnectable() 是否有完成連接狀態(tài)
isReadable() 是否有處于可讀取狀態(tài)
isWritable() 是否有處于可寫(xiě)入狀態(tài)
isValid() 是否是有效的鍵
步驟
1.獲取選擇器對(duì)象
2.創(chuàng)建通道對(duì)象,設(shè)置異步,注冊(cè)到選擇器
3.定義死循環(huán),重復(fù)檢查是否有新事件觸發(fā)(Selector中的int select()方法)
3.1.如果觸發(fā)新時(shí)間,獲取所有觸發(fā)事件集(Selector的Set selectedKeys()方法)
3.2.獲取觸發(fā)事件集合的迭代器
3.3.遍歷迭代器,獲取所有觸發(fā)的事件
3.3.1判斷觸發(fā)事件類型,指向相應(yīng)操作 舉例 if (selectionKey.isAcceptable()) {}
3.3.2刪除已完成操作的觸發(fā)事件 (Iterator的remove()方法)
服務(wù)器端代碼
public class Test04ServerBySelector { public static void main(String[] args) throws IOException, InterruptedException { //獲取一個(gè)選擇器 Selector selector = Selector.open(); //創(chuàng)建三個(gè)服務(wù)器通道,監(jiān)聽(tīng)三個(gè)端口 ServerSocketChannel serverChannel1 = ServerSocketChannel.open(); serverChannel1.bind(new InetSocketAddress(6666)); serverChannel1.configureBlocking(false); ServerSocketChannel serverChannel2 = ServerSocketChannel.open(); serverChannel2.bind(new InetSocketAddress(7777)); serverChannel2.configureBlocking(false); ServerSocketChannel serverChannel3 = ServerSocketChannel.open(); serverChannel3.bind(new InetSocketAddress(8888)); serverChannel3.configureBlocking(false); //將三個(gè)服務(wù)器通道注冊(cè)給選擇器 serverChannel1.register(selector, SelectionKey.OP_ACCEPT); serverChannel2.register(selector, SelectionKey.OP_ACCEPT); serverChannel3.register(selector, SelectionKey.OP_ACCEPT); //循環(huán)監(jiān)聽(tīng)三個(gè)通道 while (true) { System.out.println("--------"); System.out.println("等待客戶端連接..."); //獲取觸發(fā)的事件個(gè)數(shù) int keyCount = selector.select();//阻塞式方法 System.out.println("有一個(gè)客戶端連接成功..."); System.out.println("已就緒事件個(gè)數(shù)=" + keyCount); System.out.println("注冊(cè)通道數(shù)量=" + selector.keys().size()); //獲取觸發(fā)事件集 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("觸發(fā)事件數(shù)量=" + selectionKeys.size()); //獲取事件集迭代器 Iterator<SelectionKey> it = selectionKeys.iterator(); //遍歷事件集 while (it.hasNext()) { //獲取注冊(cè)鍵 SelectionKey selectionKey = it.next(); //使用選擇器完成數(shù)據(jù)讀取 if (selectionKey.isAcceptable()) { //獲取通道對(duì)象 ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); //獲取服務(wù)器與客戶端的連接 SocketChannel server = channel.accept(); //設(shè)置非阻塞 server.configureBlocking(false); //注冊(cè)讀取事件 server.register(selector, selectionKey.OP_READ); //selectionKey.interestOps(selectionKey.OP_READ); } else if (selectionKey.isReadable()) { //獲取客戶端數(shù)據(jù) ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel server = (SocketChannel) selectionKey.channel(); server.read(buffer); buffer.flip(); String content = new String(buffer.array(), 0, buffer.limit()); System.out.println("客戶端發(fā)送的數(shù)據(jù):" + content); //關(guān)閉資源 server.close(); } //刪除當(dāng)前觸發(fā)事件 it.remove(); } System.out.println("休息1秒,等待下一次操作..."); Thread.sleep(1000); } } }
客戶端代碼
public class Test04ClientByChannel { public static void main(String[] args) { int[] ports = {7777, 8888, 6666}; for (int i = 0; i < ports.length; i++) { int port = ports[i]; new Thread(new Runnable() { @Override public void run() { try { //創(chuàng)建客戶端通道 SocketChannel client = SocketChannel.open(); //連接服務(wù)器 client.connect(new InetSocketAddress("localhost", port)); //發(fā)送數(shù)據(jù) ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("你好啊,哈哈哈".getBytes()); buffer.flip(); client.write(buffer); //關(guān)閉資源 client.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } }
異步非阻塞交互(AIO)
介紹
支持異步操作的NIO體系
常見(jiàn)分類:
AsynchronousSocketChannel 客戶端異步通道
AsynchronousServerSocketChannel服務(wù)端異步通道
AsynchronousFileChannel文件異步通道
AsynchronousDatagramChannel 數(shù)據(jù)異步通道
CompletionHandler回調(diào)接口
void completed(V result,A attachment);異步操作成功被回調(diào)。
void failed(Throwable exc,A attachment);異步操作失敗時(shí)被回調(diào)。
AsynchronousSocketChannel常用方法
public static AsynchronousSocketChannel open();打開(kāi)異步服務(wù)器套接字通道。
void read(ByteBuffer dst,A attachment,CompletionHandler handler) 讀取數(shù)據(jù)。
void write(ByteBuffer src,A attachment,CompletionHandler handler) 寫(xiě)出數(shù)據(jù)
AsynchronousServerSocketChannel常用方法
public static AsynchronousServerSocketChannel open()打開(kāi)異步服務(wù)器套接字通道。
AsynchronousServerSocketChannel bind(SocketAddress local,int backlog) ;綁定服務(wù)端IP地址,端口號(hào)
void accept(A attachment,CompletionHandler handler) ;接收連接
服務(wù)器端代碼
package com.NIO.src.com.itheima; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; public class Test05ServerBySynChanner { //如果為true,服務(wù)器結(jié)束。 static boolean isOver = false; public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { //獲取服務(wù)器通道 AsynchronousServerSocketChannel serverChanner = AsynchronousServerSocketChannel.open(); //綁定端口號(hào) serverChanner.bind(new InetSocketAddress(8888)); // 獲取服務(wù)器與客戶端的對(duì)接 serverChanner.accept("accept", new CompletionHandler<AsynchronousSocketChannel, String>() { @Override public void completed(AsynchronousSocketChannel result, String attachment) { try { isOver=true; System.out.println("接受了一個(gè)連接:" + result.getLocalAddress() .toString()); // 給客戶端發(fā)送數(shù)據(jù)并等待發(fā)送完成 result.write(ByteBuffer.wrap("From Server:我是服務(wù)器".getBytes())) .get(); ByteBuffer readBuffer = ByteBuffer.allocate(128); // 阻塞等待客戶端接收數(shù)據(jù) result.read(readBuffer).get(); System.out.println(new String(readBuffer.array())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, String attachment) { isOver=true; System.out.println("連接失敗"); } }); //由于異步執(zhí)行,所以上述操作不會(huì)阻礙當(dāng)前循環(huán)的執(zhí)行。 while (true) { if (isOver) { break; } System.out.println("服務(wù)端:先干為敬"); } } }
客戶端代碼
public class Test05ClientBySynChannel { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { //創(chuàng)建客戶端通道對(duì)象 AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); //與服務(wù)器進(jìn)行連接 client.connect(new InetSocketAddress("localhost", 8888), "connect", new CompletionHandler<Void, String>() { @Override public void completed(Void result, String attachment) { System.out.println("連接到服務(wù)器成功!"); try { // 給服務(wù)器發(fā)送信息并等待發(fā)送完成 client.write(ByteBuffer.wrap("From client:我是服務(wù)器".getBytes())).get(); ByteBuffer readBuffer = ByteBuffer.allocate(128); // 阻塞等待接收服務(wù)端數(shù)據(jù) client.read(readBuffer).get(); System.out.println(new String(readBuffer.array())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, String attachment) { System.out.println("連接到服務(wù)器失敗"); } }); } }