这节介绍RocketMQ底层通信的原理
在之前的内容中有介绍过RocketMQ底层用了Netty来进行通信,下图为RocketMQ通信的大致过程,主要分为Server端和Client端。
客户端通过invokeSyncImpl、invokeAsyncImpl、invokeOnewayImpl这几个方法同服务端交互。
Server启动主要是初始化ServerBootstrap,主要配置如下:
启动完ServerBootstrap后会启动一个定时器,每3秒清除超时的请求。
这里介绍下面几个处理器:
NettyEncoder继承自LengthFieldBasedFrameDecoder,主要有用于解码入站数据流,并将数据流解码为RemotingCommand对象。
LengthFieldBasedFrameDecoder(自定义长度解码器)的构造器,涉及5个参数,都与长度域(数据包中的长度字段)相关,具体介绍如下:
以NettyEncoder为例,器构造构造方法为
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
即数据流中前4个字节的值表示有效数据域的长度,除开前4个字节外的内容都是有效数据域的内容,不存在偏移量。
接收到数据域的内容后,便会调用RemotingCommand.decode方法,将数据流转为RemotingCommand对象。
RemotingCommand对象分为Header部分和Body部分。Header部分包括固定的一组字段,已经长度不定的扩展字段;Body部分为byte[],不进行具体的细分。
数据域的解析过程同上面的类似,数据域中前4个自己为Header域的长度,取到Header长度后便能计算出Body长度,从而进行读取。RemotingCommand的内容如下:
根据serializerType的不同,Header的编码会分为Json或者二进制的方式。
NettyEncoder的反过程,将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同,Header会编码为JSON或者二进制。
NettyConnectManageHandler继承自ChannelDuplexHandler,用于监听pipeline中入站/出站的事件,主要进行日志记录。
NettyServerHandler继承自SimpleChannelInboundHandler,重写了channelRead0方法,在里面调用了父类NettyRemotingAbstract的processMessageReceived方法,如下:
该方法定义了请求和响应的处理过程。
1.processRequestCommand
处理请求过程,先根据RemotingCommand中的code值判断当前请求是否能够处理,如果不能处理则直接响应不支持。如果可以支持,则会找到对应的处理器,新起线程来处理当前请求。需要说明的是,NettyRemotingServer内部维护这一个processorTable,表示该server可以处理的command,对应的Processor以及对应的线程池。
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
Processor的定义如下,对于具体的command,会由对应的Processor来处理
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception;
boolean rejectRequest();
}
RocketMQ提供的Processor如下,其中一个Processor可能会处理一个或者多个code.
2.processResponseCommand
客户端发起一次调用时,会根据请求id,构造一个ResponseFuture,并将其缓存在responseTable字段中,用来表示目前正在进行中的请求。
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
当有响应的时候,会根据请求id,获取对应的ResponseFuture,再进行后置处理,包括执行回调、释放资源等。
Client启动主要是初始化Bootstrap,主要配置如下:
启动完ServerBootstrap后会启动一个定时器,每3秒清除超时的请求。
Client端处理链上的几个处理器,除了NettyClientHandler外都同Server端的一样。而NettyClientHandler也继承自SimpleChannelInboundHandler,并重写了channelRead0方法,在里面调用了父类NettyRemotingAbstract的processMessageReceived方法,过程跟Server端类似。
上面介绍了Server端和Client端的启动过程,以及消息的编解码,这里介绍消息的具体请求过程。主要是开头提到的invokeSyncImpl、invokeAsyncImpl和invokeOnewayImpl这几个方法。
内部是通过countdownlatch等待来模拟的同步调用,如下图:
相比同步调用,少了等待超时时间,但是增加了semaphore信号量控制最多有多少个连接同时执行。请求发起后,将结果对象缓存起来,结果将通过InvokeCallback进行回调,如果有设置回调函数,结果返回,在回调线程发起后就会将信号量回收,如果没有设置回调函数,结果返回后就会将信号量回收。其余过程大致同同步调用类似。
单向请求,无结果,请求成功后不等待结果,直接释放信号量,服务端也不会返回结果。
MQClientAPIImpl在之前介绍过,主要为Producer和Consumer提供远程通信调用的功能,内部主要是对NettyRemotingClient的封装,以对外提供服务,如:
等多种服务的封装。同时MQClientAPIImpl也能够接收服务端的主动请求,从而进行响应,对外提供的具体功能如下,通过调用registerProcessor来添加:
以NOTIFY_CONSUMER_IDS_CHANGED为例,当Broker发现Group中的Consumer实例发生改变的时候,会遍历客户的连接Channel,然后逐一通知到客户端。这时候客户端的角色转变为”服务端“,服务端转变为"客户端",两端都会触发processResponseCommand方法。