1、阻塞

服务端代码

public class Server {    public static void main(String[] args) {        // 创建缓冲区        ByteBuffer buffer = ByteBuffer.allocate(16);        // 获得服务器通道        try(ServerSocketChannel server = ServerSocketChannel.open()) {            // 为服务器通道绑定端口            server.bind(new InetSocketAddress(8080));            // 用户存放连接的集合            ArrayList channels = new ArrayList<>();            // 循环接收连接            while (true) {                System.out.println("before connecting...");                // 没有连接时,会阻塞线程                SocketChannel socketChannel = server.accept();                System.out.println("after connecting...");                channels.add(socketChannel);                // 循环遍历集合中的连接                for(SocketChannel channel : channels) {                    System.out.println("before reading");                    // 处理通道中的数据                    // 当通道中没有数据可读时,会阻塞线程                    channel.read(buffer);                    buffer.flip();                    ByteBufferUtil.debugRead(buffer);                    buffer.clear();                    System.out.println("after reading");                }            }        } catch (IOException e) {            e.printStackTrace();        }    }}

客户端代码


(资料图)

public class Client {    public static void main(String[] args) {        try (SocketChannel socketChannel = SocketChannel.open()) {            // 建立连接            socketChannel.connect(new InetSocketAddress("localhost", 8080));            System.out.println("waiting...");        } catch (IOException e) {            e.printStackTrace();        }    }}

运行结果

2、非阻塞

服务器代码如下

public class Server {    public static void main(String[] args) {        // 创建缓冲区        ByteBuffer buffer = ByteBuffer.allocate(16);        // 获得服务器通道        try(ServerSocketChannel server = ServerSocketChannel.open()) {            // 设置为非阻塞模式,没有连接时返回null,不会阻塞线程            server.configureBlocking(false);            // 为服务器通道绑定端口            server.bind(new InetSocketAddress(8080));            // 用户存放连接的集合            ArrayList channels = new ArrayList<>();            // 循环接收连接            while (true) {                                              SocketChannel socketChannel = server.accept();                // 通道不为空时才将连接放入到集合中                if (socketChannel != null) {                    System.out.println("after connecting...");                    channels.add(socketChannel);                }                // 循环遍历集合中的连接                for(SocketChannel channel : channels) {                    // 处理通道中的数据                    // 设置为非阻塞模式,若通道中没有数据,会返回0,不会阻塞线程                    channel.configureBlocking(false);                    int read = channel.read(buffer);                    if(read > 0) {                        buffer.flip();                        ByteBufferUtil.debugRead(buffer);                        buffer.clear();                        System.out.println("after reading");                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }}

这样写存在一个问题,因为设置为了非阻塞,会一直执行 while (true) 中的代码,CPU 一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求

3、Selector

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

4、使用及 Accpet 事件

要使用 Selector 实现多路复用,服务端代码如下改进

public class SelectServer {    public static void main(String[] args) {        ByteBuffer buffer = ByteBuffer.allocate(16);        // 获得服务器通道        try(ServerSocketChannel server = ServerSocketChannel.open()) {            server.bind(new InetSocketAddress(8080));            // 创建选择器            Selector selector = Selector.open();                        // 通道必须设置为非阻塞模式            server.configureBlocking(false);            // 将通道注册到选择器中,并设置感兴趣的事件            server.register(selector, SelectionKey.OP_ACCEPT);            while (true) {                // 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转                // 返回值为就绪的事件个数                int ready = selector.select();                System.out.println("selector ready counts : " + ready);                                // 获取所有事件                Set selectionKeys = selector.selectedKeys();                                // 使用迭代器遍历事件                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    SelectionKey key = iterator.next();                                        // 判断key的类型                    if(key.isAcceptable()) {                        // 获得key对应的channel                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();                        System.out.println("before accepting...");                                // 获取连接并处理,而且是必须处理,否则需要取消                        SocketChannel socketChannel = channel.accept();                        System.out.println("after accepting...");                                                // 处理完毕后移除                        iterator.remove();                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }}

步骤解析

Selector selector = Selector.open();
// 通道必须设置为非阻塞模式server.configureBlocking(false);// 将通道注册到选择器中,并设置感兴趣的实践server.register(selector, SelectionKey.OP_ACCEPT);
// 获取所有事件Set selectionKeys = selector.selectedKeys();                // 使用迭代器遍历事件Iterator iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();                    // 判断key的类型,此处为Accept类型if(key.isAcceptable()) {        // 获得key对应的channel        ServerSocketChannel channel = (ServerSocketChannel) key.channel();        // 获取连接并处理,而且是必须处理,否则需要取消        SocketChannel socketChannel = channel.accept();        // 处理完毕后移除        iterator.remove();}}

事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

5、Read 事件

public class SelectServer {    public static void main(String[] args) {        ByteBuffer buffer = ByteBuffer.allocate(16);        // 获得服务器通道        try(ServerSocketChannel server = ServerSocketChannel.open()) {            server.bind(new InetSocketAddress(8080));            // 创建选择器            Selector selector = Selector.open();            // 通道必须设置为非阻塞模式            server.configureBlocking(false);            // 将通道注册到选择器中,并设置感兴趣的实践            server.register(selector, SelectionKey.OP_ACCEPT);            // 为serverKey设置感兴趣的事件            while (true) {                // 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转                // 返回值为就绪的事件个数                int ready = selector.select();                System.out.println("selector ready counts : " + ready);                // 获取所有事件                Set selectionKeys = selector.selectedKeys();                // 使用迭代器遍历事件                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    SelectionKey key = iterator.next();                    // 判断key的类型                    if(key.isAcceptable()) {                        // 获得key对应的channel                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();                        System.out.println("before accepting...");                        // 获取连接                        SocketChannel socketChannel = channel.accept();                        System.out.println("after accepting...");                        // 设置为非阻塞模式,同时将连接的通道也注册到选择其中                        socketChannel.configureBlocking(false);                        socketChannel.register(selector, SelectionKey.OP_READ);                        // 处理完毕后移除                        iterator.remove();                    } else if (key.isReadable()) {                        SocketChannel channel = (SocketChannel) key.channel();                        System.out.println("before reading...");                        channel.read(buffer);                        System.out.println("after reading...");                        buffer.flip();                        ByteBufferUtil.debugRead(buffer);                        buffer.clear();                        // 处理完毕后移除                        iterator.remove();                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }}

删除事件

当处理完一个事件后,一定要调用迭代器的 remove 方法移除对应事件,否则会出现错误。原因如下

以我们上面的 Read 事件 的代码为例

断开处理

当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,对异常断开和正常断开需要加以不同的方式进行处理

消息边界

不处理消息边界存在的问题

将缓冲区的大小设置为 4 个字节,发送 2 个汉字(你好),通过 decode 解码并打印时,会出现乱码

ByteBuffer buffer = ByteBuffer.allocate(4);// 解码并打印System.out.println(StandardCharsets.UTF_8.decode(buffer));你���

这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 的 好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题

处理消息边界

传输的文本可能有以下三种情况

解决思路大致有以下三种

下文的消息边界处理方式为第二种:按分隔符拆分

附件与扩容

Channel 的 register 方法还有第三个参数:附件,可以向其中放入一个 Object 类型的对象,该对象会与登记的 Channel 以及其对应的 SelectionKey 绑定,可以从 SelectionKey 获取到对应通道的附件

public final SelectionKey register(Selector sel, int ops, Object att)

可通过 SelectionKey 的 attachment () 方法获得附件

ByteBuffer buffer = (ByteBuffer) key.attachment();

我们需要在 Accept 事件发生后,将通道注册到 Selector 中时,对每个通道添加一个 ByteBuffer 附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题

// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件socketChannel.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);// 添加通道对应的Buffer附件socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当 Channel 中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法: Channel 调用 compact 方法后,的 position 与 limit 相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用 SelectionKey 的 attach 方法将新的缓冲区作为新的附件放入 SelectionKey 中

// 如果缓冲区太小,就进行扩容if (buffer.position() == buffer.limit()) {    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);    // 将旧buffer中的内容放入新的buffer中    ewBuffer.put(buffer);    // 将新buffer作为附件放到key中    key.attach(newBuffer);}

改造后的服务器代码如下

public class SelectServer {    public static void main(String[] args) {        // 获得服务器通道        try(ServerSocketChannel server = ServerSocketChannel.open()) {            server.bind(new InetSocketAddress(8080));            // 创建选择器            Selector selector = Selector.open();            // 通道必须设置为非阻塞模式            server.configureBlocking(false);            // 将通道注册到选择器中,并设置感兴趣的事件            server.register(selector, SelectionKey.OP_ACCEPT);            // 为serverKey设置感兴趣的事件            while (true) {                // 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转                // 返回值为就绪的事件个数                int ready = selector.select();                System.out.println("selector ready counts : " + ready);                // 获取所有事件                Set selectionKeys = selector.selectedKeys();                // 使用迭代器遍历事件                Iterator iterator = selectionKeys.iterator();                                while (iterator.hasNext()) {                    SelectionKey key = iterator.next();                    iterator.remove();                    // 判断key的类型                    if(key.isAcceptable()) {                        // 获得key对应的channel                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();                        System.out.println("before accepting...");                        // 获取连接                        SocketChannel socketChannel = channel.accept();                        System.out.println("after accepting...");                        // 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件                        socketChannel.configureBlocking(false);                        ByteBuffer buffer = ByteBuffer.allocate(16);                        socketChannel.register(selector, SelectionKey.OP_READ, buffer);                                                                    } else if (key.isReadable()) {                        SocketChannel channel = (SocketChannel) key.channel();                        System.out.println("before reading...");                        // 通过key获得附件(buffer)                        ByteBuffer buffer = (ByteBuffer) key.attachment();                        int read = channel.read(buffer);                        if(read == -1) {                            key.cancel();                            channel.close();                        } else {                            // 通过分隔符来分隔buffer中的数据                            split(buffer);                            // 如果缓冲区太小,就进行扩容                            if (buffer.position() == buffer.limit()) {                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);                                // 将旧buffer中的内容放入新的buffer中                                buffer.flip();                                newBuffer.put(buffer);                                // 将新buffer放到key中作为附件                                key.attach(newBuffer);                            }                        }                        System.out.println("after reading...");                                                                  }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }    private static void split(ByteBuffer buffer) {        buffer.flip();        for(int i = 0; i < buffer.limit(); i++) {            // 遍历寻找分隔符            // get(i)不会移动position            if (buffer.get(i) == "\n") {                // 缓冲区长度                int length = i+1-buffer.position();                ByteBuffer target = ByteBuffer.allocate(length);                // 将前面的内容写入target缓冲区                for(int j = 0; j < length; j++) {                    // 将buffer中的数据写入target中                    target.put(buffer.get());                }                // 打印结果                ByteBufferUtil.debugAll(target);            }        }        // 切换为写模式,但是缓冲区可能未读完,这里需要使用compact        buffer.compact();    }}

ByteBuffer 的大小分配

6、Write 事件

服务器通过 Buffer 向通道中写入数据时,可能因为通道容量小于 Buffer 中的数据大小,导致无法一次性将 Buffer 中的数据全部写入到 Channel 中,这时便需要分多次写入,具体步骤如下

int write = socket.write(buffer);// 通道中可能无法放入缓冲区中的所有数据if (buffer.hasRemaining()) {    // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中    socket.configureBlocking(false);    socket.register(selector, SelectionKey.OP_WRITE, buffer);}
SocketChannel socket = (SocketChannel) key.channel();// 获得bufferByteBuffer buffer = (ByteBuffer) key.attachment();// 执行写操作int write = socket.write(buffer);System.out.println(write);// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣if (!buffer.hasRemaining()) {    key.attach(null);    key.interestOps(0);}

整体代码如下

public class WriteServer {    public static void main(String[] args) {        try(ServerSocketChannel server = ServerSocketChannel.open()) {            server.bind(new InetSocketAddress(8080));            server.configureBlocking(false);            Selector selector = Selector.open();            server.register(selector, SelectionKey.OP_ACCEPT);            while (true) {                selector.select();                Set selectionKeys = selector.selectedKeys();                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    SelectionKey key = iterator.next();                    // 处理后就移除事件                    iterator.remove();                    if (key.isAcceptable()) {                        // 获得客户端的通道                        SocketChannel socket = server.accept();                        // 写入数据                        StringBuilder builder = new StringBuilder();                        for(int i = 0; i < 500000000; i++) {                            builder.append("a");                        }                        ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());                        // 先执行一次Buffer->Channel的写入,如果未写完,就添加一个可写事件                        int write = socket.write(buffer);                        System.out.println(write);                        // 通道中可能无法放入缓冲区中的所有数据                        if (buffer.hasRemaining()) {                            // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中                            socket.configureBlocking(false);                            socket.register(selector, SelectionKey.OP_WRITE, buffer);                        }                    } else if (key.isWritable()) {                        SocketChannel socket = (SocketChannel) key.channel();                        // 获得buffer                        ByteBuffer buffer = (ByteBuffer) key.attachment();                        // 执行写操作                        int write = socket.write(buffer);                        System.out.println(write);                        // 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣                        if (!buffer.hasRemaining()) {                            key.attach(null);                            key.interestOps(0);                        }                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }}

7、优化

多线程优化

充分利用多核 CPU,分两组选择器

实现思路
实现代码
public class ThreadsServer {    public static void main(String[] args) {        try (ServerSocketChannel server = ServerSocketChannel.open()) {            // 当前线程为Boss线程            Thread.currentThread().setName("Boss");            server.bind(new InetSocketAddress(8080));            // 负责轮询Accept事件的Selector            Selector boss = Selector.open();            server.configureBlocking(false);            server.register(boss, SelectionKey.OP_ACCEPT);            // 创建固定数量的Worker            Worker[] workers = new Worker[4];            // 用于负载均衡的原子整数            AtomicInteger robin = new AtomicInteger(0);            for(int i = 0; i < workers.length; i++) {                workers[i] = new Worker("worker-"+i);            }            while (true) {                boss.select();                Set selectionKeys = boss.selectedKeys();                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    SelectionKey key = iterator.next();                    iterator.remove();                    // BossSelector负责Accept事件                    if (key.isAcceptable()) {                        // 建立连接                        SocketChannel socket = server.accept();                        System.out.println("connected... ");                        socket.configureBlocking(false);                        // socket注册到Worker的Selector中                        System.out.println("before read...");                        // 负载均衡,轮询分配Worker                        workers[robin.getAndIncrement()% workers.length].register(socket);                        System.out.println("after read...");                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }    }    static class Worker implements Runnable {        private Thread thread;        private volatile Selector selector;        private String name;        private volatile boolean started = false;        /**         * 同步队列,用于Boss线程与Worker线程之间的通信         */        private ConcurrentLinkedQueue queue;        public Worker(String name) {            this.name = name;        }        public void register(final SocketChannel socket) throws IOException {            // 只启动一次            if (!started) {                thread = new Thread(this, name);                selector = Selector.open();                queue = new ConcurrentLinkedQueue<>();                thread.start();                started = true;            }                        // 向同步队列中添加SocketChannel的注册事件            // 在Worker线程中执行注册事件            queue.add(new Runnable() {                @Override                public void run() {                    try {                        socket.register(selector, SelectionKey.OP_READ);                    } catch (IOException e) {                        e.printStackTrace();                    }                }            });            // 唤醒被阻塞的Selector            // select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark            selector.wakeup();        }        @Override        public void run() {            while (true) {                try {                    selector.select();                    // 通过同步队列获得任务并运行                    Runnable task = queue.poll();                    if (task != null) {                        // 获得任务,执行注册操作                        task.run();                    }                    Set selectionKeys = selector.selectedKeys();                    Iterator iterator = selectionKeys.iterator();                    while(iterator.hasNext()) {                        SelectionKey key = iterator.next();                        iterator.remove();                        // Worker只负责Read事件                        if (key.isReadable()) {                            // 简化处理,省略细节                            SocketChannel socket = (SocketChannel) key.channel();                            ByteBuffer buffer = ByteBuffer.allocate(16);                            socket.read(buffer);                            buffer.flip();                            ByteBufferUtil.debugAll(buffer);                        }                    }                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

本文由传智教育博学谷教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

推荐内容