Netty系列之服务端启动流程

Netty系列之服务端启动流程

服务端启动Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NettyServer {

public static void main(String[] args) {
// 配置服务端的NIO线程组,用于接收和处理新的连接
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 初始化线程池,设置主、从线程组。对应Reactor模式的主Reactor、从Reactor.
serverBootstrap.group(parentGroup, childGroup);
// 传入NioServerSocketChannel,表示创建的通道为服务端通道
serverBootstrap.channel(NioServerSocketChannel.class);

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
ch.pipeline().addLast(new SimpleHandler());
}
});

// 异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture future = serverBootstrap.bind(8080).sync();
// 关闭监听端口,并且阻塞当前线程直到它完成
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭EventLoopGroup释放所有的资源
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}

从Demo可以看到前面基本上都是在加载参数,我们直接就看一下ChannelFuture future = serverBootstrap.bind(8080).sync(); 的实现原理,先上一张流程图,原图地址:https://blog.csdn.net/akfly/article/details/53240028

服务端流程图

bind()

1
2
3
4
5
6
7
public ChannelFuture bind(SocketAddress localAddress) {
validate();//相关参数非空校验
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}

doBind()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {// regFuture不为空
return regFuture;
}

if (regFuture.isDone()) {//判断initAndRegister方法是否正常执行完毕
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

Channel注册

initAndRegister()——实例化一个regFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}
通过反射机制实例化NioServerSocketChannel类

ReflectiveChannelFactory.newChannel()

1
2
3
4
5
6
7
8
public T newChannel() {
try {
// clazz就是服务启动的时候,通过.channel传进去的NioServerSocketChannel
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

NioServerSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

NioServerSocketChannel的父类AbstractNioChannel中,在此类和其父类AbstractChannel中,初始化了pipeline和unsafe以及一些其他参数的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

AbstractChannel

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

Channel注册selector

regFuture.cause()——判断Channel是否是CauseHolder类型,不是则返回null。

1
2
3
4
public Throwable cause() {
Object result = this.result;
return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
}