<返回更多

彻底搞懂 Netty 线程模型

2020-10-14    
加入收藏



本文作者:何建辉(公众号:org_yijiaoqian)

点赞再看,养成习惯,微信搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JAVAStudy 已收录,有我的系列文章。

基本概念

IO 模型

NIO 和 AIO 不同之处在于应用是否进行真正的读写操作。

reactor 和 proactor 模型

Netty认识

Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等。

通过前面 NIO 的学习可以看到,NIO 的类库和API 繁杂,例如 Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等这些对于从事应用层的程序员来说,使用起来开发工作量和难度都非常大。另外客户端面临断连重连、 网络闪断、心跳处理、半包读写、 网络拥塞 和异常流的处理等等。

Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持。

在了解Netty使用场景后,本节将从IO模型的演进角度来分析Netty线程模型,通过并发编程之父Doug Lea所写《Scalable IO in Java》中涉及的一些IO处理模式,一步一步深入理解Netty线程模型的“进化历史”。

《Scalable IO in Java》:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Netty使用场景

  1. 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 使用 Dubbo 协议进行通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信;消息中间件Rocketmq底层也是用的Netty作为基础通信组件。
  2. 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言都得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
  3. 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它的 Netty Service 是基于 Netty 框架二次封装实现。

Netty相关开源项目:

https://netty.io/wiki/related-projects.html

IO处理模式演进

基本上所有的网络处理程序都遵循以下基本的处理(handler)流程:

  1. Read request (接收二进制数据)
  2. Decode request (解码为可读数据)
  3. Process service (对数据进行处理产生结果)
  4. Encode reply (将结果编码为二进制数据)
  5. Send reply (返回结果)

传统网络服务器会为每一个连接的处理开启一个新的线程,即我们前面所说的BIO模型(多线程模式),我们可以看下大致的示意图:

彻底搞懂 Netty 线程模型

 

上图为 BIO版本

BIO模型对于每一个请求都分发给一个线程(可以理解为一个handler),每个handler中都独自处理上面1-5流程。这种模型的适用场景和瓶颈可以查看《BIO 、NIO 、AIO 总结》。

改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理(非阻塞)。这就是对应的NIO线程模型。

彻底搞懂 Netty 线程模型

 

上图为单线程版事件驱动模型,可以理解为NIO单线程版本

上面用到了 Reactor 模式。关于 Reactor 模式的两个概念:

注意:Reactor 为单个线程,如上图所示:不仅需要处理客户端的 accept 连接请求,同时也要负责分发(dispatch)读写请求到处理器中。由于只有单个线程处理各种请求,所以要求处理器中的业务需要能够快速处理完。

改进:现在的服务器基本上是多核 CPU,那么在多处理器场景下,为实现服务的高性能我们可以有目的的采用多线程模式处理业务。

彻底搞懂 Netty 线程模型

 

上图为多线程版本事件驱动模型,可以理解为NIO多线程版本

通过与NIO单线程模型相比,增加了worker线程池,专门用于处理非IO操作(decode、compute、encode),大大提高了工作效率。

这种模型下,客户端发送过来的连接和注册还是由主线程 Reactor 统一去处理,只不过客户端连接成功后的后续事件分发给 worker 线程池去处理。

但是,当客户端短时间内几十万或者上百万条连接请求的时候(双十一、春运抢票),单个 Rector 不仅要处理注册事件,也要同时分发任务到 worker 线程池,由于分发也是比较耗时的操作,有可能会导致阻塞。

继续改进:将Reactor 拆分为两部分

彻底搞懂 Netty 线程模型

 

上图为主从NIO模型

如上图所示,在这种模型下,mainReactor 专门负责新客户端的连接操作,建立通道(channel),然后将一定事件内的 channel 注册到另外一个 subReactor,subReactor 负责将客户端的读写请求交给线程池处理。这样即使几十万个请求同时到来也无所谓了。

通俗理解,mainReactor就是大总管,只负责接口,subReactor就是一个员工,负责给总管接待客户提供服务。

Netty线程模型

通过对 《 Scalable IO in Java 》里的一些 IO 处理模式理解, Netty的线程模型就是由上面主从NIO模型演变来的,是基于Reactor模型的。

如下图所示,Boos Group 就是上面提到的 mainReactor,与上面不同在于 Worker Group,它可以理解为一组 subReactor,即在大总管下面有多个员工来干活,每次接收的客户都均匀分配给不同员工。Netty 之所以单机支持百万级别并发量,就是因为一主多从的线程模型。

彻底搞懂 Netty 线程模型

 

需要说明的是,Netty 的线程模型并不是一成不变的。它通常采用一主多从,但是也可以根据实际需要配置启动参数,通过设置不同的启动参数,Netty 可以同时支持 “多主多从”。

下面是对上图“一主多从” Netty 模型的详细解释:

  1. Netty 抽象除两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写。
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。
  3. NioEventLoopGroup 相当于一个事件循环线程组,这个组中含有多个事件循环线程,每一个事件循环线程是 NioEventLoop。
  4. 每个 NioEventLoop 都有一个 selector,用于监听注册在其上的 socketChannel 的网络通讯。
  5. 每个 Boss NioEventLoop 线程内部循环执行的步骤有 3 步:处理accept事件,与 client 建立连接,生成 NIOSocketChannel;将NioSocketChannel 注册到某个 worker NioEventLoop 上的 selector;处理任务队列的任务,即runAllTasks。
  6. 每个 worker NIOEvent'Loop线程循环执行的步骤:轮询注册到最近的 selector 上所有的 NioSocketChannel 的 read、write 事件;处理 I/O 事件,即 read、write事件,在对应的NioScoketChannel 处理业务;runAllTask 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中,这样不影响数据在 pipeline 中的流动处理。
  7. 每个 worker NIOEventLoop 处理 NioSocketChannel 业务时,会使用 pipeline (管道),管道中维护来很多 handler 处理器用来处理 channel 中的数据。

Netty 模块组件

Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty程序,通过链式调用串联各个组件。Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端 启动引导类。

Future、ChannelFuture

正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

Channel

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  1. 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  2. 网络连接的配置参数 (例如接收缓冲区大小)
  3. 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
  4. 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
  5. 支持关联 I/O 操作与对应的处理程序。不同协议、不同的阻塞类型的连接都有不同的Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。(最常用)
NioServerSocketChannel,异步的服务器端 TCP Socket 连接NioDatagramChannel,异步的 UDP 连接NioSctpChannel,异步的客户端 Sctp 连接NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector

Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

NioEventLoop

NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  1. I/O 任务即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  2. 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

NioEventLoopGroup

NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

ChannelHandler

ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler 用于处理入站 I/O 事件
ChannelOutboundHandler 用于处理出站 I/O 操作

或者使用以下适配器类:

ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。

ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

ChannelPipline

保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:

彻底搞懂 Netty 线程模型

 

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

Netty通讯示例

Netty的maven依赖

<dependencies>
 <dependency>
  <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.52.Final</version>
    </dependency>
</dependencies>

服务端代码

package com.niuh.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            //给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端所注册的自定义回调函数 NettyServerHandler:

package com.niuh.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }
    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

这个类继承了ChannelInboundHandlerAdapter的几个方法:

客户端代码

package com.niuh.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入处理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start。。");
            //启动客户端去连接服务器端
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
            //对通道关闭进行监听
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端自定义回调函数:

package com.niuh.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);    }    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

与NettyServerHandler类似,其中的channelActive()方法是当客户端与服务器连接完成时候就会执行的方法。

看完代码,我们发现Netty架的目标就是让你的业务逻辑从网络基础应用编码中分离出来,让你可以专 注业务的开发,而不需写一大堆类似NIO的网络处理操作。

ByteBuf 理解

从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息。

ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。

彻底搞懂 Netty 线程模型

 

ByteBuf.png

需要注意的是极限的情况是 readerIndex 刚好读到了 writerIndex 写入的地方。如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf-BoundsException 异常。

示例代码

package com.niuh.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class NettyByteBuf {    public static void main(String[] args) {        // 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
        // 通过readerindex和writerIndex和capacity,将buffer分成三个区域        // 已经读取的区域:[0,readerindex)
        // 可读取的区域:[readerindex,writerIndex)        // 可写的区域: [writerIndex,capacity)        ByteBuf byteBuf = Unpooled.buffer(10);
        System.out.println("byteBuf=" + byteBuf);
        for (int i = 0; i < 8; i++) {
            byteBuf.writeByte(i);        }        System.out.println("byteBuf=" + byteBuf);
        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.getByte(i));        }        System.out.println("byteBuf=" + byteBuf);
        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.readByte());        }        System.out.println("byteBuf=" + byteBuf);
        //用Unpooled工具类创建ByteBuf        ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhangsan!", CharsetUtil.UTF_8);
        //使用相关的方法        if (byteBuf2.hasArray()) {
            byte[] content = byteBuf2.array();
            //将 content 转成字符串            System.out.println(new String(content, CharsetUtil.UTF_8));            System.out.println("byteBuf=" + byteBuf2);
            System.out.println(byteBuf2.readerIndex()); // 0
            System.out.println(byteBuf2.writerIndex()); // 12
            System.out.println(byteBuf2.capacity()); // 36
            System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104
            int len = byteBuf2.readableBytes(); //可读的字节数  12
            System.out.println("len=" + len);
            //使用for取出各个字节
            for (int i = 0; i < len; i++) {
                System.out.println((char) byteBuf2.getByte(i));
            }            //范围读取            System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
            System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
        }    }}

PS:以上代码提交在 Github :https://github.com/Niuh-Study/niuh-netty.git

文章持续更新,可以微信搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>