百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

看我如何把NIO拉下神坛 将你拉下神坛

yuyutoo 2024-10-14 16:20 1 浏览 0 评论

1. 传统的阻塞式I/O



阻塞式I/O的阻塞指的是,socket的read函数、write函数是阻塞的。

1.2 阻塞式I/O编程模型

public static void main(String[] args) {
        
        try (ServerSocket serverSocket = new ServerSocket()) {
            // 绑定端口
            serverSocket.bind(new InetSocketAddress(8081));
            while (true) {

                // 轮询established
                Socket socket = serverSocket.accept();

                new Thread(() -> {
                    try (BufferedReader buffer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                         PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true)) {
                        // 读消息
                        while (true) {
                            String body = buffer.readLine();
                            if (body == null) {
                                break;
                            }
                            log.info("receive body: {}", body);
                        }

                        // 写消息
                        printWriter.write("server receive message!");

                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }).start();
            }

        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

因为socket的accept函数,read函数,write函数是同步阻塞的,所以主线程不断调用socket的accept函数,轮询状态是established的TCP连接。

read函数会从内核缓冲区中读取已经准备好的数据,复制到用户进程,如果内核缓冲区中没有数据,那么这个线程就的就会被挂起,相应的cpu的使用权被释放出来。当内核缓冲中准备好数据后,cpu会响应I/O的中断信号,唤醒被阻塞的线程处理数据。

当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。

阻塞式I/O模型



阻塞式I/O的缺点

缺乏扩展性,严重依赖线程。Java的线程占用内存在512K-1M,线程数量过多会导致JVM内存溢出。大量的线程上下文切换严重消耗CPU性能。大量的I/O线程被激活会导致系统锯齿状负载。

2. NIO编程

同步非阻塞I/O模型



对于NIO来说,如果内核缓冲区中没有数据就直接返回一个EWOULDBLOCK错误,一般来说进程可以轮询调用read函数,当缓冲区中有数据的时候将数据复制到用户空间,而不用挂起线程。

所以同步非阻塞中的非阻塞指的是socket的读写函数不是阻塞的,但是用户进程依然需要轮询读写函数,所以是同步的。但是NIO给我们提供了不需要新起线程就可以利用CPU的可能,也就是I/O多路复用技术

2.1 I/O多路复用技术

在linux系统中,可以使用select/poll/epoll使用一个线程监控多个socket,只要有一个socket的读缓存有数据了,方法就立即返回,然后你就可以去读这个可读的socket了,如果所有的socket读缓存都是空的,则会阻塞,也就是将线程挂起。

一开始用的linux用的是select,但是selct比较慢,最终使用了epoll。

2.1.1 epoll的优点

  1. 支持打开的socket描述符(FD)仅受限于操作系统最大文件句柄数,而select最大支持1024。
  2. selcet每次都会扫描所有的socket,而epoll只扫描活跃的socket。
  3. 使用mmap加速数据在内核空间到用户空间的拷贝。

2.2 NIO的工作机制

NIO实际上是一个事件驱动的模型,NIO中最重要的就是多路复用器(Selector)。在NIO中它提供了选择就绪事件的能力,我们只需要把通道(Channel) 注册到Selector上,Selector就会通过select方法(实际上操作系统是通过epoll)不断轮询注册在其上的Channel,如果某个Channel上发生了读就绪、写就绪或者连接到来就会被Selector轮询出来,然后通过SelectionKey(Channel注册到Selector上时会返回和其绑定的SelectionKey)可以获取到已经就绪的Channel集合,否则Selector就会阻塞在select方法上。

Selector调用select方法,并不是一个线程通过for循环去选择就绪的Channel,而是操作系统通过epoll以事件的方式的通知JVM的线程,哪个通道发生了读就绪或者写就绪的事件。所以select方法更像是一个监听器。

多路复用的核心目的就是使用最少的线程去操作更多的通道,在其内部并不是只有一个线程。创建线程的个数是根据通道的数量来决定的,每注册1023个通道就创建1个新的线程。

NIO的核心是多路复用器和事件模型,搞清楚了这两点其实就能搞清楚NIO的基本工作原理。原来在学习NIO的时候感觉很复杂,随着对TCP理解的深入,发现NIO其实并不难。在使用NIO的时候,最核心的代就是把Channel和要监听的事件注册到Selector上。

不同类型通道支持的事件


NIO事件模型示意图


2.2.1 代码示例

ServerReactor

@Slf4j
public class ServerReactor implements Runnable {
    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;
    private volatile boolean stop = false;

    public ServerReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);
        // 将channel注册到多路复用器上,并监听ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }

    @Override
    public void run() {
        try {
            // 无限的接收客户端连接
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    try {
                        handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    private void handle(SelectionKey key) throws Exception {
        if (key.isValid()) {
            // 如果是ACCEPT事件,代表是一个新的连接请求
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                // 相当于三次握手后,从全连接队列中获取可用的连接
                // 必须使用accept方法消费ACCEPT事件,否则将导致多路复用器死循环
                SocketChannel socketChannel = serverSocketChannel.accept();
                // 设置为非阻塞模式,当没有可用的连接时直接返回null,而不是阻塞。
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }

            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String content = new String(bytes);
                    System.out.println("recv client content: " + content);
                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(("服务端已收到: " + content).getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);

                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }

        }
    }
}

ClientReactor

public class ClientReactor implements Runnable {
    final String host;
    final int port;
    final SocketChannel socketChannel;
    final Selector selector;
    private volatile boolean stop = false;

    public ClientReactor(String host, int port) throws IOException {
        this.socketChannel = SocketChannel.open();
        this.socketChannel.configureBlocking(false);
        Socket socket = this.socketChannel.socket();
        socket.setTcpNoDelay(true);
        this.selector = Selector.open();
        this.host = host;
        this.port = port;

    }

    @Override
    public void run() {

        try {
            // 如果通道呈阻塞模式,则立即发起连接;
            // 如果呈非阻塞模式,则不是立即发起连接,而是在随后的某个时间才发起连接。

            // 如果连接是立即建立的,说明通道是阻塞模式,当连接成功时,则此方法返回true,连接失败出现异常。
            // 如果此通道处于阻塞模式,则此方法的调用将会阻塞,直到建立连接或发生I/O错误。

            // 如果连接不是立即建立的,说明通道是非阻塞模式,则此方法返回false,
            // 并且以后必须通过调用finishConnect()方法来验证连接是否完成
            // socketChannel.isConnectionPending()判断此通道是否正在进行连接
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);

            }
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    try {
                        handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }


    }

    private void handle(SelectionKey key) throws IOException {

        if (key.isValid()) {

            SocketChannel socketChannel = (SocketChannel) key.channel();

            if (key.isConnectable()) {
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    doWrite(socketChannel);
                }
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    System.out.println("recv server content: " + new String(bytes));
                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }

        }
    }

    private void doWrite(SocketChannel socketChannel) {
        Scanner scanner = new Scanner(System.in);
        new Thread(() -> {
            while (scanner.hasNext()) {
                try {

                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(scanner.nextLine().getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);
                } catch (Exception e) {

                }
            }
        }).start();
    }
}

作者:克里斯朵夫李维

链接:https://juejin.im/post/5dfae986518825122671c846

相关推荐

MySQL5.5+配置主从同步并结合ThinkPHP5设置分布式数据库

前言:本文章是在同处局域网内的两台windows电脑,且MySQL是5.5以上版本下进行的一主多从同步配置,并且使用的是集成环境工具PHPStudy为例。最后就是ThinkPHP5的分布式的连接,读写...

thinkphp5多语言怎么切换(thinkphp5.1视频教程)

thinkphp5多语言进行切换的步骤:第一步,在配置文件中开启多语言配置。第二步,创建多语言目录。相关推荐:《ThinkPHP教程》第三步,编写语言包。视图代码:控制器代码:效果如下:以上就是thi...

基于 ThinkPHP5 + Bootstrap 的后台开发框架 FastAdmin

FastAdmin是一款基于ThinkPHP5+Bootstrap的极速后台开发框架。主要特性基于Auth验证的权限管理系统支持无限级父子级权限继承,父级的管理员可任意增删改子级管理员及权限设置支持单...

Thinkphp5.0 框架实现控制器向视图view赋值及视图view取值操作示

本文实例讲述了Thinkphp5.0框架实现控制器向视图view赋值及视图view取值操作。分享给大家供大家参考,具体如下:Thinkphp5.0控制器向视图view的赋值方式一(使用fetch()方...

thinkphp5实现简单评论回复功能(php评论回复功能源码下载)

由于之前写评论回复都是使用第三方插件:畅言所以也就没什么动手,现在证号在开发一个小的项目,所以就自己动手写评论回复,没写过还真不知道评论回复功能听着简单,但仔细研究起来却无法自拔,由于用户量少,所以...

ThinkPHP框架——实现定时任务,定时更新、清理数据

大家好,我是小蜗牛,今天给大家分享一下,如何用ThinkPHP5.1.*版本实现定时任务,例如凌晨12点更新数据、每隔10秒检测过期会员、每隔几分钟发送请求保证ip的活性等本次分享,主要用到一个名为E...

BeyongCms系统基于ThinkPHP5.1框架的轻量级内容管理系统

BeyongCms内容管理系统(简称BeyongCms)BeyongCms系统基于ThinkPHP5.1框架的轻量级内容管理系统,适用于企业Cms,个人站长等,针对移动App、小程序优化;提供完善简...

YimaoAdminv3企业建站系统,使用 thinkphp5.1.27 + mysql 开发

介绍YimaoAdminv3.0.0企业建站系统,使用thinkphp5.1.27+mysql开发。php要求5.6以上版本,推荐使用5.6,7.0,7.1,扩展(curl,...

ThinkAdmin-V5开发笔记(thinkpad做开发)

前言为了快速开发一款小程序管理后台,在众多的php开源后台中,最终选择了基于thinkphp5的,轻量级的thinkadmin系统,进行二次开发。该系统支持php7。文档地址ThinkAdmin-V5...

thinkphp5.0.9预处理导致的sql注入复现与详细分析

复现先搭建thinkphp5.0.9环境...

thinkphp5出现500错误怎么办(thinkphp页面错误)

thinkphp5出现500错误,如下图所示:相关推荐:《ThinkPHP教程》require():open_basedirrestrictionineffect.File(/home/ww...

Thinkphp5.0极速搭建restful风格接口层

下面是基于ThinkPHPV5.0RC4框架,以restful风格完成的新闻查询(get)、新闻增加(post)、新闻修改(put)、新闻删除(delete)等server接口层。1、下载Thin...

基于ThinkPHP5.1.34 LTS开发的快速开发框架DolphinPHP

DophinPHP(海豚PHP)是一个基于ThinkPHP5.1.34LTS开发的一套开源PHP快速开发框架,DophinPHP秉承极简、极速、极致的开发理念,为开发集成了基于数据-角色的权限管理机...

ThinkPHP5.*远程代码执行高危漏洞手工与升级修复解决方法

漏洞描述由于ThinkPHP5框架对控制器名没有进行足够的安全检测,导致在没有开启强制路由的情况下,黑客构造特定的请求,可直接GetWebShell。漏洞评级严重影响版本ThinkPHP5.0系列...

Thinkphp5代码执行学习(thinkphp 教程)

Thinkphp5代码执行学习缓存类RCE版本5.0.0<=ThinkPHP5<=5.0.10Tp框架搭建环境搭建测试payload...

取消回复欢迎 发表评论: