制作静态动漫网站模板,宁夏省住房城乡建设厅网站,百度广州分公司销售岗位怎么样,注册公司多少钱流程及费用写在前面
在这篇文章看了netty服务是如何启动的#xff0c;服务启动成功后#xff0c;也就相当于是迎宾工作都已经准备好了#xff0c;那么当客人来了怎么招待客人呢#xff1f;也就是本文要看的处理连接的工作。
1#xff1a;正文 先启动源码example模块的echoserver服务启动成功后也就相当于是迎宾工作都已经准备好了那么当客人来了怎么招待客人呢也就是本文要看的处理连接的工作。
1正文 先启动源码example模块的echoserver后启动echoclient进行debug调试。 从哪里开始呢是从EventLoop方法中开始在EventLoop中有一个run方法注意不是Thread的run方法通过死循环的方式不断的轮询selector的事件这里自然是轮询op_accept事件了故事就从这里开始了如下
// io.netty.channel.nio.NioEventLoop#run
// 系循环方式处理事件相当用户死循环selector.select
Override
protected void run() {int selectCnt 0;for (;;) {try {// ...if (ioRatio 100) {// ...} else if (strategy 0) {final long ioStartTime System.nanoTime();try {// 处理select到的事件processSelectedKeys();} finally {// ...}} else {// ...}// ...}
}processSelectedKeys如下
// io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {// processSelectedKeysOptimized是netty的优化版本使用了反射等更少的gc1%~2%的效率提升等if (selectedKeys ! null) {processSelectedKeysOptimized();} else {// 这里直接使用Java NIO的方式来获取发生发的事件了一般不走processSelectedKeysPlain(selector.selectedKeys());}
}这里执行processSelectedKeysOptimized
// io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {for (int i 0; i selectedKeys.size; i) {final SelectionKey k selectedKeys.keys[i]; // 事件对应的selection key// null out entry in the array to allow to have it GCed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] null;final Object a k.attachment(); // 这里的attachment就是serversocketchannel了在serversocketchannle绑定到selector时指定的if (a instanceof AbstractNioChannel) { // 这里自然是true了因为我们使用的是Java NIO的方式processSelectedKey(k, (AbstractNioChannel) a);} else {// ...}// ...}
}注意代码final Object a k.attachment();获取到对应server socket channel比较重要接着看代码processSelectedKey(k, (AbstractNioChannel) a);:
// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe ch.unsafe();// ...try {// ...// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop 接受连接的话会执行到这里if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}这里的readyOps自然就是op_read1了unsafe.read();,执行到io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages:
// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
protected int doReadMessages(ListObject buf) throws Exception {SocketChannel ch SocketUtils.accept(javaChannel()); // 内部会accept连接生成socketchannel类 serverSocketChannel.accept();try {if (ch ! null) {buf.add(new NioSocketChannel(this, ch)); // 存储到buf后续会从buf中拿return 1; // 代表buf中元素的数量}} catch (Throwable t) {// ...}return 0;
}SocketChannel ch SocketUtils.accept(javaChannel());就accept了接着执行代码
// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {// ...try {// ...int size readBuf.size();for (int i 0; i size; i ) {readPending false;pipeline.fireChannelRead(readBuf.get(i)); // 通过pipeline开始处理连接请求主要看ServerBootstrapAcceptor其他的都是辅助的handler如日志打印等}// ...} finally {// ...}
}pipeline.fireChannelRead(readBuf.get(i));主要看ServerBootstrapAcceptor其他的都是辅助的handler如日志打印等
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {// ...try {// 完成注册不管是serversocketchannel还是socketchannel都是注册到selector中childGroup.register(child).addListener(new ChannelFutureListener() {// ...});} catch (Throwable t) {forceClose(child, t);}
}childGroup.register:
// io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ...// eventLoop.inEventLoop()判断是否为eventgrou线程这里不是如果是main启动的话则此时是main threadlogger.info(当前的线程是: Thread.currentThread().getName());if (eventLoop.inEventLoop()) {register0(promise);} else { // 使用eventloop的线程执行注册到seletor的工作try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// ...}}
}此处在这篇文章已经看过了只不过这里是将socketchannel注册到selector而已。因为注册op事件比较重要所以这里再来看下
// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// ...// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// ...}}} catch (Throwable t) {// ...}
}pipeline.fireChannelActive();最终执行到
// io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;// 完成op事件的注册对于serversocketchannel就是op_acceptfinal int interestOps selectionKey.interestOps();if ((interestOps readInterestOp) 0) {logger.info(注册op事件{}, (interestOps | readInterestOp));selectionKey.interestOps(interestOps | readInterestOp);}
}自然这里注册的op_code就是1op_read了。
2流程总结
1event loop死循环执行selectfor (;;) {}
2监听到op_accept事件acceept连接创建socketchannelSocketChannel ch SocketUtils.accept(javaChannel())
3绑定op_read事件等待读取数据selectionKey.interestOps(interestOps | readInterestOp);3两个线程
其中一个是boss group对应的eventloop中的线程另一个是worker group对应的event loop中的线程
boss线程轮询selectoraccept连接创建socket channel为读写客户端数据做准备初始化socket channel从worker group中选择一个event loop
worker线程绑定socketchannel到selector中注册socketchannel的op_read事件到selector中准备读取数据写在后面
参考文章列表
netty之是如何做好服务准备的。