首頁(yè)技術(shù)文章正文

如何通過(guò)NIO實(shí)現(xiàn)群聊?【黑馬程序員】

更新時(shí)間:2020-10-29 來(lái)源:黑馬程序員 瀏覽量:

1.NIO群聊實(shí)現(xiàn)步驟

·構(gòu)建Selector以及服務(wù)端監(jiān)聽(tīng)通

·道啟動(dòng)監(jiān)聽(tīng)并處理建立連接請(qǐng)求

·處理讀數(shù)據(jù)

·群發(fā)數(shù)據(jù)實(shí)現(xiàn)

·客戶端測(cè)試實(shí)現(xiàn)

2. 服務(wù)端實(shí)現(xiàn)
2.0 服務(wù)端完整代碼服務(wù)端的主要功能如下

(1)開(kāi)放監(jiān)聽(tīng)端口,方法ChatServer構(gòu)造方法

(2)處理鏈接請(qǐng)求,方法listener實(shí)現(xiàn)連接的建立

(2)讀取消息內(nèi)容,方法readData

(4)轉(zhuǎn)發(fā)消息給當(dāng)前所有在線的人,方法sendData2All

package com.hgy.chat;
/**
* 群聊服務(wù)器
*/
public class ChatServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    /**
    * 初始化服務(wù)端
    */
    public ChatServer() {
        try {
            // 創(chuàng)建Selector以及ServerSocketChannel
            selector = Selector.open();
            serverSocketChannel = serverSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8888));
            //將服務(wù)端監(jiān)聽(tīng)通道注冊(cè)到Selector中
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }   
    }
    /**
    * 監(jiān)聽(tīng)客戶端操作
    */
    public void listener() {
        while (true) {
            try {
                if (selector.select(1000) == 0) {
                continue;
            }
            //獲得所有有事件的key
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //如果當(dāng)前key是處理鏈接類型
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel =
                        serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    // 當(dāng)前鏈接是讀數(shù)據(jù)類型
                    if (key.isReadable()) {
                        readData(key);
                    }
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
    * 讀取數(shù)據(jù)并群發(fā)給所有的用戶
    * @param key
    */
    private void readData(SelectionKey key) {
        try {
            if (key.isReadable()) {
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                channel.read(byteBuffer);
                String s = new String(byteBuffer.array());
                // 寫(xiě)到其他所有客戶端
                sendData2All(s);
        }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
    * 群發(fā)給所有的用戶
    * @param msg 需要發(fā)送的消息
    */
    private void sendData2All(String msg) {
        try {
            // 當(dāng)前在selector上注冊(cè)的所有key就是所有用戶
            Set<SelectionKey> keys = selector.keys();
            for (SelectionKey key : keys) {
                // 獲取每個(gè)用戶的通道
                SelectableChannel channel = key.channel();
                // 實(shí)現(xiàn)數(shù)據(jù)發(fā)送
                if (channel instanceof SocketChannel) {
                    System.out.println(":::" + msg);
                    ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
                    SocketChannel socketChannel = (SocketChannel) channel;
                    socketChannel.write(byteBuffer);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer();
        chatServer.listener();
    }
}


2.1 構(gòu)建Selector以及服務(wù)端監(jiān)聽(tīng)通道

當(dāng)ChatServer對(duì)象被創(chuàng)建時(shí)具體實(shí)現(xiàn)步驟如下

(1)創(chuàng)建serverSocketChannel對(duì)象

(2)設(shè)置處理模式為非阻塞模式

(3)綁定監(jiān)聽(tīng)端口

(4)將channel注冊(cè)到selector中

public class ChatServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    /**
    * 初始化服務(wù)端
    */
    public ChatServer() {
        try {
            // 創(chuàng)建Selector以及ServerSocketChannel
            selector = Selector.open();
            serverSocketChannel = serverSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8888));
            //將服務(wù)端監(jiān)聽(tīng)通道注冊(cè)到Selector中
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


2.2 實(shí)現(xiàn)監(jiān)聽(tīng)并處理建立連接請(qǐng)求

連接請(qǐng)求處理實(shí)現(xiàn)步驟

(1)獲得所有有事件的key,通過(guò)key就可以拿到用戶的SocketChannel

(2)循環(huán)遍歷每一個(gè)key,判斷當(dāng)前是讀事件,還是建立連接事件

(3)如果是建立連接事件則直接將該通道注冊(cè)到selector中

(4)如果是讀數(shù)據(jù)事件就交給具體的讀數(shù)據(jù)方法處理數(shù)據(jù)

 

2.3 處理讀數(shù)據(jù)數(shù)據(jù)

處理的具體實(shí)現(xiàn)步驟

(1)通過(guò)key獲取和用戶連接的通道(相當(dāng)于輸入流)

(2)獲取通道的數(shù)據(jù)并打印

(3)將數(shù)據(jù)轉(zhuǎn)發(fā)給其他在線用戶

public void listener() {
    while (true) {
        try {
            if (selector.select(1000) == 0) {
                continue;
            }
            //獲得所有有事件的key
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //如果當(dāng)前key是處理鏈接類型
                if (key.isAcceptable()) {
                    SocketChannel socketChannel =
                    serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                // 當(dāng)前鏈接是讀數(shù)據(jù)類型
                if (key.isReadable()) {
                    readData(key);
                }
                iterator.remove();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


2.4 群發(fā)數(shù)據(jù)實(shí)現(xiàn)

數(shù)據(jù)群發(fā)實(shí)現(xiàn)步驟

(1)當(dāng)前在線用戶實(shí)際上就是selector中所有注冊(cè)的key,也就是在線的用戶

(2)通過(guò)key拿到和用戶的鏈接講消息轉(zhuǎn)發(fā)出去

/**
* 監(jiān)聽(tīng)客戶端操作
*/
/**
* 讀取數(shù)據(jù)并群發(fā)給所有的用戶
* @param key
*/
private void readData(SelectionKey key) {
    try {
        if (key.isReadable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            channel.read(byteBuffer);
            String s = new String(byteBuffer.array());
            // 寫(xiě)到其他所有客戶端
            sendData2All(s);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}


2.5 啟動(dòng)服務(wù)端

public static void main(String[] args) {
    ChatServer chatServer = new ChatServer();
    chatServer.listener();
}


3. 客戶端實(shí)現(xiàn)

客戶端實(shí)現(xiàn)

(1)首先創(chuàng)建SocketChannel對(duì)象并鏈接到具體的服務(wù)器

(2)將通道注冊(cè)到selector中

(3)開(kāi)啟一個(gè)新的線程監(jiān)聽(tīng)selector中所有key的事件

(4)在主線程中循環(huán)阻塞獲取用戶的輸入 

public class ChatClient {
    public static void main(String[] args) throws Exception {
    // 客戶端代碼, 建立連接
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open(new
        InetSocketAddress("127.0.0.1", 8888));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 開(kāi)啟一個(gè)新的線程輪詢當(dāng)前客戶是否有可讀消息
        new Thread(() -> {
            while (true) {
                try {
                    int select = selector.select(1000);
                    // 有可讀消息進(jìn)行解析打印
                    if (select > 0) {
                        for (SelectionKey key : selector.selectedKeys()) {
                            if (key.isReadable()) {
                                SocketChannel channel = (SocketChannel)
                                key.channel();
                                ByteBuffer byteBuffer =
                                ByteBuffer.allocate(1024);
                                channel.read(byteBuffer);
                                System.out.println(":==:" + new
                                String(byteBuffer.array()));
                                // 寫(xiě)到其他所有客戶端
                                System.out.println(new
                                String(byteBuffer.array()));
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }   
            }
        }).start();
        // 主線程中循環(huán)獲取用戶輸入的聊天消息
        while(true) {
            Scanner scanner = new Scanner(System.in);
            //發(fā)送用戶的消息
            socketChannel.write(ByteBuffer.wrap(scanner.nextLine().getBytes()));
        }
    }
}


猜你喜歡:

什么是敏捷開(kāi)發(fā)?十分鐘了解

IO流、字節(jié)流和字符流分別是什么?

Java類加載機(jī)制詳解 



分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!