本文共 12386 字,大约阅读时间需要 41 分钟。
第一次尝试看框架源码,大概就是先写一个demo,然后根据demo 的流程去看核心代码的实习,这个项目就是根据netty包下的echo 样例写的
/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */package io.netty.example.echo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;/** 1. Sends one message when a connection is open and echoes back any received 2. data to the server. Simply put, the echo client initiates the ping-pong 3. traffic between the echo client and server by sending the first message to 4. the server. */public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }}
1、构造器方法public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null);2、this(nThreads, (Executor) null);调用构造器 public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider());}3、上面的 this(nThreads, executor, SelectorProvider.provider());调用下面构造器public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}4、public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}5、MultithreadEventLoopGroupprotected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}追踪到源码抽象类 MultithreadEventExecutorGroup的构造器方法 MultithreadEventExecutorGroup 才是NioEventLoopGroup真正的构造方法,这里可以看成是一个模板方法,使用了设计模式的模板模式
接着是MultithreadEventExecutorGroup的分析
this.terminatedChildren = new AtomicInteger(); this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } else { if (executor == null) { executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); } this.children = new EventExecutor[nThreads]; int j; for(int i = 0; i < nThreads; ++i) { boolean success = false; boolean var18 = false; try { var18 = true; this.children[i] = this.newChild((Executor)executor, args); success = true; var18 = false; } catch (Exception var19) { throw new IllegalStateException("failed to create a child event loop", var19); } finally { if (var18) { if (!success) { int j; for(j = 0; j < i; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var20) { Thread.currentThread().interrupt(); break; } } } } } if (!success) { for(j = 0; j < i; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var22) { Thread.currentThread().interrupt(); break; } } } } this.chooser = chooserFactory.newChooser(this.children); FutureListener
就是说
1)如果 executor是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty默认的线程工厂。
2)根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。 3)循环填充数组中的元素。如果异常,则关闭所有的单例线程池。 4)根据线程选择工厂创建一个线程选择器。 5)为每一个单例线程池添加一个关闭监听器。 6)将所有的单例线程池添加到一个 HashSet中。ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() { @Overridepublic void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline();if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoServerHandler());}});
1)链式调用:group方法,将 boss和 worker传入,boss赋值给 parentGroup属性,worker赋值给 childGroup属性
2) channel方法传入 NioServerSocketChannel class对象。会根据这个 class创建 channel对象。 3) option方法传入 TCP参数,放在一个 LinkedHashMap中。 4) handler方法传入一个 handler中,这个 hanlder只专属于 ServerSocketChannel而不是 SocketChannel 5) childHandler传入一个 hanlder,这个 handler将会在每个客户端连接的时候调用。供 SocketChannel使用服务器就是在bind端口这里完成初始化的
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
initAndRegister 这个方法作用就是把channel 注册到workgroup中的Eventloop中
Channel channel = null; try { channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); } return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;
最后:
1)基本说明: initAndRegister()初始化 NioServerSocketChannel通道并注册各个 handler,返回一个 future 2)通过 ServerBootstrap的通道工厂反射创建一个 NioServerSocketChannel。 3) init初始化这个 NioServerSocketChannel。 4) config().group().register(channel)通过 ServerBootstrap的 bossGroup注册 NioServerSocketChannel。 5)最后,返回这个异步执行的占位符即 regFuture。转载地址:http://ekxzi.baihongyu.com/