用户可以通过 netty 的 ServerBoostrap 启动服务端,时序图如下:
输入图片说明
为了便于大家理解,我们把服务端启动的代码放在下面:
public void run() throws Exception {
/**
* EventLoopGroup 是用来处理I/O操作的多线程事件循环器
* bossGroup: 用来接收进来的连接
* workerGroup: 用来处理已经被接收的连接
* 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* ServerBootstrap 是一个启动 NIO 服务的辅助启动类。
* 你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
*/
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
//指定使用 NIOServerSocketChannel 类来举例说明一个新的 Channel 如何接收进来的连接。
.channel(NioServerSocketChannel.class) // (3)
/**
* 这里的事件处理类经常会被用来处理一个最近的已经接收的 Channel。
* ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
* 也许你想通过增加一些处理类比如DiscardServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。
* 当你的程序变的复杂时,可能你会增加更多的处理类到 pipeline 上,然后提取这些匿名类到最顶层的类上。
*/
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
/**
* 你可以设置这里指定的 Channel 实现的配置参数。
* 我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。
* 请参考 ChannelOption 和详细的 ChannelConfig 实现的接口文档以此可以对ChannelOption 的有一个大概的认识。
*
* option() 是提供给 NioServerSocketChannel 用来接收进来的连接。
* childOption() 是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel。
*/
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
/**
* 剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的 8080 端口。
* 当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。
*/
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println("DiscardServer start...");
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
(1)Builder 构建者模式
为了解决参数较多的问题,这里 netty ServerBootstrap 使用了 builder 模式,可以大大降低我们的配置量,也可以灵活指定配置。
(2)EventLoopGroup 线程池
为了提升性能,线程池是一个自然的选择。
(3)设置并且绑定 channel
可以发现我们源码中只有一句话 channel(NioServerSocketChannel.class)
这个方法源码如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
很显然,这里使用了工厂模式。
我们只需要简单的指定 class 信息,netty 会自动通过反射创建对应的实现类。
(4)初始化 ChannelPipeline
流水线使用了责任链模式,用于处理一系列的 ChannelHandler。
(5)添加 ChannelHandler
netty 这里的 initChannel 方法,可以让我们非常方便的添加 ChannelHandler。
对个人的影响也比较大,我写的很多工具方法也会采用类似的模式。
(6)绑定并且启动监听端口
我们使用时只有非常优雅的一句话 ChannelFuture f = b.bind(port).sync();,实际上 netty 为我们做了一些封装:
虽然我们只指定了 port,本质上肯定还是 socket 地址,只不过默认 ip 为本地而已。
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind 的实现还是比较多的,暂时不做展开。
(7)selector 轮训 & 触发
写过 JAVA nio 的小伙伴们肯定知道,需要通过 selector 轮训获取消息。
实际上 netty 将这些细节封装了起来。轮训准备就绪 Channel 之后,将由 Reactor 线程 NioEventLoop 执行 ChannelPipeline 的响应方法,最终调用到 ChannelHandler。
ChannelHandler 中包含了系统内置的处理类,和用户自定义的处理类。
每一个版本的源码可能有差异,这里老马的版本是 4.1.17.Final。
EventLoopGroup 就是 Reactor 线程池。
group 方法如下:
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
这里调用了父类的方法 super.group(parentGroup);,实现如下:
/**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
这个方法主要用于设置 IO 线程,执行和调度网络事件的读写。
channel 的方法如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
ReflectiveChannelFactory 反射的核心实现如下:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
实际上就是通过 NioServerSocketChannel 创建了 Channel 对象。
启动类可以为启动服务类和父类,分别设置 Handler。
这个也是一开始老马学习 netty 比较迷惑的地方,这两个有啥区别呢?
输入图片说明
本质区别:
(1)ServerBoostrap 中的 Handler 是 NioServerSocketChannel 使用的,所有连接这个监听端口的客户端都会执行。
(2)父类 AbstractServerBoostrap 中的 Handler 是一个工厂类,会为每一个接入的客户端都创建一个新的 Handler。
最后,还有服务端的端口绑定。我们上面只是简单的过了一下,这里做一下展开:
private ChannelFuture doBind(final SocketAddress localAddress) {
//1. 创建 channel 并注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//2. 创建完成后,设置对应的附加属性
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
//3. 添加监听器
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister() 完整实现如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1. 通过 channelFactory 创建新的 channel
channel = channelFactory.newChannel();
//2. 初始化相关属性
init(channel);
} catch (Throwable t) {
// 省略
}
// 省略
return regFuture;
}
init 是一个抽象方法,服务端实现如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
//1. 设置 Socket 参数和 NioserverSocketChannel 的 附加属性
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 属性省略
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//2. 将 AbstractBoostrap 的 Handler 添加到 NioserverSocketChannel 的 pipeline 中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//3. 将用于服务端注册的 ServerBootstrapAcceptor 添加到 pipeline 中
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
到这里,服务端的监听相关资源已经初始化完毕。
接下来,需要把 NioserverSocketChannel 注册到 Reactor 线程的多路复用选择器上,然后轮训客户端事件。
构造器如下:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
这个就是默认 channel 初始化的构造器,实际调用的是:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
归根到底,默认的 SelectorProvider 应该是 jdk nio 的 DefaultSelectorProvider。
实际上,还是根据初始化 ServerSocketChannel:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
可以看到,这里默认注册监听了 SelectionKey.OP_ACCEPT 事件。
其中 SelectionKey 只有 4 种:
public static final int OP_ACCEPT = 1 << 4;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_WRITE = 1 << 2;
public static final int OP_READ = 1 << 0;
注册的源码比较多,看得人云里雾里的。
可以理解,就是首先注册自己感兴趣的事件,发生的时候通知你即可。
注册的方法如下我们主要看 NioEventLoop 即可,这个类继承自 SingleThreadEventLoop 类。
实现了 SingleThreadEventExecutor 类的 run 方法,如下:
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
// 省略
processSelectedKeys();
// 省略
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
// 省略
}
}
我们只看核心的部分,这里实际上就是一个死循环,注册的部分核心如下:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
// 省略
ch.register(selector, interestOps, task);
// 省略
}
在 AbstractSelectableChannel 中的实现如下:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
// 省略
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
这里实际上是注册感兴趣的事件,服务端到这里基本上已经告一段落了。
下面我们看一下 NioEventLoop 是如何处理客户端请求的。
当多路复用器就绪时,默认执行 processSelectedKeysOptimized() 方法:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 这里处理的 attachment 是 AbstractNioChannel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
这里实际上是根据不同的的类型,执行不同的操作:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我们来重点关注下 read 方法。
NioUnsafe 是一个接口,有两个子类:NioByteUnsafe 和 NioMessageUnsafe。
NioServerSocketChannel 继承自 AbstractNioMessageChannel,使用的是 NioMessageUnsafe 类。
实现如下:
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 核心方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
// 处理读取的信息
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 channel read
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 触发 read complete
pipeline.fireChannelReadComplete();
} finally {
// 省略
}
}
doReadMessages 在 NioServerSocketChannel 类中实现如下:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// 省略
}
return 0;
}
这里就是 jdk nio 中的接收到一个新的客户端请求的方法实现。
读取完成之后,触发 fireChannelRead,如下:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
invokeChannelRead 如下:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
实际上最后就是一个责任链去调用各种 ChannelInboundHandler 类。
到此,客户端接入完成。
可以进行网络读写等 IO 操作。