博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty学习 (七、源码剖析Netty启动过程源码)
阅读量:3958 次
发布时间:2019-05-24

本文共 12386 字,大约阅读时间需要 41 分钟。

1、看源码技巧

第一次尝试看框架源码,大概就是先写一个demo,然后根据demo 的流程去看核心代码的实习,这个项目就是根据netty包下的echo 样例写的

2、源码启动类如下

/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * *   https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */package io.netty.example.echo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;/** 1. Sends one message when a connection is open and echoes back any received 2. data to the server.  Simply put, the echo client initiates the ping-pong 3. traffic between the echo client and server by sending the first message to 4. the server. */public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception {
// Configure SSL.git final SslContext sslCtx; if (SSL) {
sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else {
sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer
() {
@Override public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally {
// Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }}
  1. 首先创建了关于SSL的配置类
  2. 然后创建俩个EventLoopGroup对象
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workGroup = new NioEventLoopGroup();
  3. 其中 bossGroup对象用来接受客户端请求,他会将请求交给workGroup,workGroup会获取到真正的连接,然后和连接进行通信,比如解码编码等操作
  4. EventLoopGroup 是事件循环组,也就是个线程组,其中含有多个Eventloop(线程),可以注册channel,用于在事件循环中去进行选择,也就是每个Eventloop维护了一个Selctor 还有taskQueue,通过选择器,注册channel到socketserver
  5. EventLoopGroup bossGroup = new NioEventLoopGroup(1), 这个1表示boosGroup事件组有一个线程你可以指定
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));
    会创建 EventExecutor数组 children = new EventExecutor[nThreads]; //debug一下
    每个元素的类型就是 NIOEventLoop, NIOEventLoop实现了 EventLoop接口和 Executor接口
  6. try块中创建了一个 ServerBootstrap对象,他是一个引导类,用于启动服务器和引导整个程序的初始化了 。就是封装了创建NIO连接的细节,其中需要传入一个bossgroup线程组用于管理连接,一个work线程组分配响应事件、其中还需要创建一个channel的反射工厂用于创建每个连接类型的bean,再然后就是创建一个双向链表,管理channelhandler
  7. 绑定端口并阻塞至连接成功
  8. 最后 main线程阻塞等待关闭
EventLoopGroup的过程分析如下
1、构造器方法public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);2、this(nThreads, (Executor) null);调用构造器 public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());}3、上面的 this(nThreads, executor, SelectorProvider.provider());调用下面构造器public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}4、public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}5、MultithreadEventLoopGroupprotected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}追踪到源码抽象类 MultithreadEventExecutorGroup的构造器方法 MultithreadEventExecutorGroup 才是NioEventLoopGroup真正的构造方法,这里可以看成是一个模板方法,使用了设计模式的模板模式

接着是MultithreadEventExecutorGroup的分析

this.terminatedChildren = new AtomicInteger();        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);        if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } else {
if (executor == null) {
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); } this.children = new EventExecutor[nThreads]; int j; for(int i = 0; i < nThreads; ++i) {
boolean success = false; boolean var18 = false; try {
var18 = true; this.children[i] = this.newChild((Executor)executor, args); success = true; var18 = false; } catch (Exception var19) {
throw new IllegalStateException("failed to create a child event loop", var19); } finally {
if (var18) {
if (!success) {
int j; for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j]; try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var20) {
Thread.currentThread().interrupt(); break; } } } } } if (!success) {
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j]; try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var22) {
Thread.currentThread().interrupt(); break; } } } } this.chooser = chooserFactory.newChooser(this.children); FutureListener terminationListener = new FutureListener() {
public void operationComplete(Future future) throws Exception {
if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null); } } }; EventExecutor[] arr$ = this.children; j = arr$.length; for(int i$ = 0; i$ < j; ++i$) {
EventExecutor e = arr$[i$]; e.terminationFuture().addListener(terminationListener); } Set
childrenSet = new LinkedHashSet(this.children.length); Collections.addAll(childrenSet, this.children); this.readonlyChildren = Collections.unmodifiableSet(childrenSet); }

就是说

1)如果 executor是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty默认的线程工厂。

2)根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
3)循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
4)根据线程选择工厂创建一个线程选择器。
5)为每一个单例线程池添加一个关闭监听器。
6)将所有的单例线程池添加到一个 HashSet中。

ServerBootstrap创建和构造过程
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer
() {
@Overridepublic void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoServerHandler());}});

1)链式调用:group方法,将 boss和 worker传入,boss赋值给 parentGroup属性,worker赋值给 childGroup属性

2) channel方法传入 NioServerSocketChannel class对象。会根据这个 class创建 channel对象。
3) option方法传入 TCP参数,放在一个 LinkedHashMap中。
4) handler方法传入一个 handler中,这个 hanlder只专属于 ServerSocketChannel而不是 SocketChannel
5) childHandler传入一个 hanlder,这个 handler将会在每个客户端连接的时候调用。供 SocketChannel使用

绑定端口分析

服务器就是在bind端口这里完成初始化的

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) {
return regFuture; } else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause(); if (cause != null) {
promise.setFailure(cause); } else {
promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

initAndRegister 这个方法作用就是把channel 注册到workgroup中的Eventloop中

Channel channel = null;        try {
channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly(); } return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close(); } else {
channel.unsafe().closeForcibly(); } } return regFuture;

最后:

1)基本说明: initAndRegister()初始化 NioServerSocketChannel通道并注册各个 handler,返回一个 future
2)通过 ServerBootstrap的通道工厂反射创建一个 NioServerSocketChannel。
3) init初始化这个 NioServerSocketChannel。
4) config().group().register(channel)通过 ServerBootstrap的 bossGroup注册 NioServerSocketChannel。
5)最后,返回这个异步执行的占位符即 regFuture。

转载地址:http://ekxzi.baihongyu.com/

你可能感兴趣的文章
linux杀死进程详解
查看>>
字符串表示的IP地址与点分式表示的IP地址间的相互转化
查看>>
implicit declaration of function 这种警告问题的原因及解决方法
查看>>
utorrent如何处理占资源过大的问题
查看>>
<好文分享>妖怪和和尚过河问题
查看>>
uTP协议的前世今生(from wikipedia)
查看>>
uTP协议的前世今生(from wikipedia)
查看>>
utp的包头格式<2>
查看>>
开源搜索引擎的比较(收藏几个博客文章)最近要做搜索系统的研究方向
查看>>
asii码表
查看>>
<读书笔记>WebUsage Mining:Discovery and Applications of Usage Patterns from Web Data
查看>>
并查集(Disjoint Sets)
查看>>
在Linux下安装MATLAB
查看>>
readme
查看>>
微服务概念
查看>>
数据库分库分表
查看>>
hibernate inverse 和cascade讲解
查看>>
建模工具Rose的学习
查看>>
javascript ajax提出异步请求
查看>>
Hibernate 中的 QBC
查看>>