Netty 原理总结

为什么使用 Netty

  1. 实现协议的局限性
    今天,我们使用通用的应用程序或者类库来实现互相通讯,比如,我们经常使用一个 HTTP 客户端库来从 web 服务器上获取信息,或者通过 web 服务来执行一个远程的调用。
    然而,有时候一个通用的协议或他的实现并没有很好的满足需求。比如我们无法使用一个通用的 HTTP 服务器来处理大文件、电子邮件以及近实时消息,比如金融信息和多人游戏数据。我们需要一个高度优化的协议来处理一些特殊的场景。例如你可能想实现一个优化了的 Ajax 的聊天应用、媒体流传输或者是大文件传输器,你甚至可以自己设计和实现一个全新的协议来准确地实现你的需求。
    另一个不可避免的情况是当你不得不处理遗留的专有协议来确保与旧系统的互操作性。在这种情况下,重要的是我们如何才能快速实现协议而不牺牲应用的稳定性和性能。
  2. 使用 Netty 可以有效改善这种情况
    Netty 是一个提供 asynchronous event-driven (异步事件驱动)的网络应用框架,是一个用以快速开发高性能、高可靠性协议的服务器和客户端。
    换句话说,Netty 是一个 NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。
    “快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议.因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。
    有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是 Netty 和它们的不同之处。答案就是 Netty 的哲学设计理念。Netty 从开始就为用户提供了用户体验最好的 API 以及实现设计。正是因为 Netty 的哲学设计理念,才让您得以轻松地阅读本指南并使用 Netty。

架构总览

Netty架构总览
Netty 的架构由三部分组成——缓冲(buffer),通道(channel),事件模型(event model)——所有的高级特性都构建在这三个核心组件之上。

NIO

  1. 想了解 Aio 与 Nio 的利弊,为什么 Netty 没有采用 Aio 实现?

NIO 基于传输层,可以自定义数据处理逻辑来作为应用层,或者基于现有的 HTTP 组件进行升级,在线上环境这样的升级会带来一些兼容性问题,HTTP 已有相应的协议升级机制:Protocol upgrade mechanism

NIO 相对 BIO 优势:

  1. 零拷贝
    零拷贝减少线程上下文切换次数,且数据直接拷贝到内核空间,不占用 JVM 堆空间;
  2. 减少线程资源浪费
    NIO 可以一个线程监听多个 Socket 的连接、读、写请求,而不是像 BIO 那样每个 Socket 创建一个线程,但是同时会有一个问题:

Netty 核心组件

  1. Channel 和 ChannelHandler
  2. ByteBuf
  3. Pipeline

服务端

Netty流程

代码

下面是一个启动Netty服务端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 空闲检测
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(15, 0, 0,
TimeUnit.MINUTES));

// 半包/粘包分解器
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(2048, true, getFirstBytes()
));
ch.pipeline().addLast(其他Handler比如解码之类的);
}
}).option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.bind(10885).sync()

创建EventLoop

在上面的代码中,出现了bossGroupworkerGroup,bossGroup主要负责监听连接,拿到连接后,交给workerGroup中的线程来监听读或写事件。
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup
EventExecutorGroup会给每个线程创建一个EventLoop

io.netty.channel.nio.NioEventLoop#NioEventLoop
newChild()创建EventLoop实例,其默认实现是NioEventLoop

io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
服务器初始化过程中创建了个线程池ThreadPerTaskExecutor

  • 每次执行任务都会构造一个线程执行
    io.netty.util.concurrent.ThreadPerTaskExecutor#execute

创建及初始化 ServerSocketChannel

Netty 有一个叫做 Channel 的统一的异步 I/O 编程接口,这个编程接口抽象了所有点对点的通信操作。也就是说,如果你的应用是基于 Netty 的某一种传输实现,那么同样的,你的应用也可以运行在 Netty 的另一种传输实现上。Netty 提供了几种拥有相同编程接口的基本传输实现:

  • 基于 NIO 的 TCP/IP 传输 (见 io.netty.channel.nio),
  • 基于 OIO 的 TCP/IP 传输 (见 io.netty.channel.oio),
  • 基于 OIO 的 UDP/IP 传输, 和
  • 本地传输 (见 io.netty.channel.local).

切换不同的传输实现通常只需对代码进行几行的修改调整,例如选择一个不同的 ChannelFactory 实现。
此外,你甚至可以利用新的传输实现没有写入的优势,只需替换一些构造器的调用方法即可,例如串口通信。而且由于核心 API 具有高度的可扩展性,你还可以完成自己的传输实现。

  1. 入口
    io.netty.bootstrap.AbstractBootstrap#bind(int)
    用户代码调用bind绑定端口时会触发Channel的创建和初始化

io.netty.bootstrap.ServerBootstrap#init
对Channel的使用可以追溯到这个init方法,包括Channel的创建、属性等的设置。

  1. 创建
    NioServerSocketChannel的构造方法 -> io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
    可以看到,Netty中的ServerSocketChannel其实就对应JDK NIO中的ServerSocketChannel,在创建NioServerSocketChannel的同时创建了一个NIO中的ServerSocketChannel

  2. 初始化
    中间包含对childOptionschildAttrs等的设置。

  3. 添加一个连接处理器ServerBootstrapAcceptor

注册Selector

紧接着上面的初始化过程,接下来是注册NIO中的Selector。
io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
总而言之最终还是使用NIO注册了 Selector。
io.netty.channel.nio.AbstractNioChannel#doRegister

启动 NioEventLoop

io.netty.bootstrap.AbstractBootstrap#doBind0
绑定端口号的同时,执行一个线程。
io.netty.util.concurrent.SingleThreadEventExecutor#startThread
NioEventLoop启动流程的最终启动了一个线程。
io.netty.channel.nio.NioEventLoop#run
该线程任务根据EventLoop的实现不同而有所不同,在NioEventLoop中,主要任务为以下3步:

  1. 接收事件(selectionKey)
    io.netty.channel.nio.NioEventLoop#select
    当检查没有需要处理的selectionKey时就会发生空轮询,Netty在轮询时会记录空轮询次数,当空轮询达到一定次数时,将之前注册的事件先取消,从而避免了NIO的空轮询Bug

  2. 检测新连接并创建NioSocketChannel
    io.netty.channel.nio.NioEventLoop#processSelectedKeys
    处理连接请求,并分发请求到pipeline
    io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

    • 每个连接创建一个ServerSocketChannel
      io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
    • 读取数据并分发到pipeline
      io.netty.channel.ChannelPipeline#fireChannelReadComplete
  3. 执行线程任务
    io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)

pipeline中的第一个ChannelHandler

pipeline的第一个Handler为ServerBootstrapAcceptor,它的主要任务包括:

  1. 将用户自定义ChannelHandler添加到pipeline

  2. 选择一个NioEventLoop传播事件
    io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

  3. 注册selector
    代码流程非常长,但是最终可以跟到doRegister这个方法,可以发现最后还是调用了JDK的SocketChannel注册Selector。
    io.netty.channel.AbstractChannel.AbstractUnsafe#register0 -> io.netty.channel.nio.AbstractNioChannel#doRegister

  4. 注册读事件
    代码最后判断第一次连接则触发连接激活事件,代码位置仍然是上边的register0
    io.netty.channel.AbstractChannel.AbstractUnsafe#register0
    继续往下看可以看到最终将读事件(selectionKey)注册到了Selector
    io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
    -> io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead
    -> io.netty.channel.nio.AbstractNioChannel#doBeginRead

选择EventLoop:
io.netty.util.concurrent.MultithreadEventExecutorGroup#chooser
每当有客户端连接进来时,Netty需要决定选择哪个EventLoop,这个工作是由EventExecutorChooser负责的:

  • GenericEventExecutorChooser:循环选择。
  • PowerOfTwoEventExecutorChooser:也是循环选择,只不过GenericEventExecutorChooser使用了取模运算,而PowerOfTwoEventExecutorChooser是通过位运算实现的。

Pipeline

  1. 创建Pipeline
    创建NioSocketChannel时会创建Pipeline:
    io.netty.channel.AbstractChannel#AbstractChannel
    Pipeline本身是一个双向链表的结构,且有两个哨兵节点headtail

  2. 添加Pipeline
    添加到链表
    io.netty.channel.ChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
    检查是否重复添加,如果加了@Sharable注解是可以重复添加的
    io.netty.channel.DefaultChannelPipeline#checkMultiplicity
    添加到链表末尾,也就是添加到tail节点的前面。
    io.netty.channel.DefaultChannelPipeline#addLast0

  3. 删除Pipeline
    有时候我们需要删除一个Pipeline上的某些ChannelHandler,比如已经进行过了授权校验,那下次就不需要再执行授权校验了,我们就可以直接把授权相关的那些ChannelHandler删除掉。
    首先遍历Pipeline找到目标ChannelHandler。
    io.netty.channel.DefaultChannelPipeline#getContextOrDie
    然后从Pipeline中移除。
    io.netty.channel.DefaultChannelPipeline#remove(AbstractChannelHandlerContext)

  4. inBound事件传播
    ChannelHandler中每个事件都有一个接口,ChannelInboundHandler专门处理输入事件,以channelRead为例。
    EventLoop会将读事件传给Pipeline,然后按责任链模式的逻辑从head节点开始传播事件。
    io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

  5. outBound事件传播
    ChannelOutboundHandler专门用于处理输出事件,以write为例。

    1
    2
    3
    4
    5
    6
    7
    public class EchoServerOutHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.channel().write("Hello");
    }
    }

    当我们在Handler中调用Context的write方法时,就是将写事件传给了Pipeline,Pipeline会从tail节点开始往前传播。
    io.netty.channel.AbstractChannelHandlerContext#write

心跳检测

应用协议层的心跳是必须的,它和 tcp keepalive 是完全不同的概念。应用层协议层的心跳检测的是连接双方的存活性,兼而连接质量,而 keepalive 检测的是连接本身的存活性。而且后者的超时时间默认过长,完全不能适应现代的网络环境。
Netty 内置通过增加 IdleStateHandler 产生 IDLE 事件进行便捷的心跳控制。你要处理的,就是心跳超时的逻辑,比如延迟重连。但它的轮训时间是固定的,无法动态修改,高级功能需要自己定制。
不同场景下需要切换不同的保活机制,在一些客户端比如 Android,频繁心跳的唤起会浪费大量的网络和电量,它的心跳策略会更加复杂一些。

优雅退出

Java 的优雅停机通常通过注册 JDK ShutdownHook 来实现。
Runtime.getRuntime().addShutdownHook();
一般通过 kill -15 进行 java 进程的关闭,以便在进程死亡之前进行一些清理工作。

注意:kill -9 会立马杀死进程,不给遗言的机会,比较危险。

虽然 netty 做了很多优雅退出的工作,通过 EventLoopGroup 的 shutdownGracefully 方法对 nio 进行了一些状态设置,但在很多情况下,这还不够多。它只负责单机环境的优雅关闭。
流量可能还会通过外层的路由持续进入,造成无效请求。一种可行的做法是首先在外层路由进行一次本地实例的摘除,把流量截断,然后再进行 netty 本身的优雅关闭。

示例协议实现

不少中间件会实现自己的协议,比如 Redis、MySQL,MyCat、TiDB 用的就是 MySQL 协议。
netty 默认实现了 dns、haproxy、http、http2、memcache、mqtt、redis、smtp、socks、stomp、xml 等协议。
协议分为两种:

  • 文本协议在调试起来是比较直观和容易的,但安全性欠佳;
  • 二进制协议就需要依赖日志、wireshark 等其他方式进行分析,增加了开发难度。
  1. 示例协议 - echo
  2. 示例协议 - discard
  3. 示例协议 - uptime
  4. 示例二进制协议 - factorial
  5. 示例文本协议 - telnet

数据结构 - ByteBuf

Netty 使用自建的 buffer API,而不是使用 NIO 的 ByteBuffer 来表示一个连续的字节序列。与 ByteBuffer 相比这种方式拥有明显的优势。Netty 使用新的 buffer 类型 ByteBuf,被设计为一个可从底层解决 ByteBuffer 问题,并可满足日常网络应用开发需要的缓冲类型。这些很酷的特性包括:

  • 如果需要,允许使用自定义的缓冲类型。
  • 复合缓冲类型中内置的透明的零拷贝实现。
  • 开箱即用的动态缓冲类型,具有像 StringBuffer 一样的动态缓冲能力。
  • 不再需要调用的 flip()方法。
  • 正常情况下具有比 ByteBuffer 更快的响应速度。

ByteBuf结构
以上就是一个 ByteBuf 的结构图,从上面这幅图可以看到

  1. ByteBuf 是一个字节容器,容器里面的的数据分为三个部分,第一个部分是已经丢弃的字节,这部分数据是无效的;第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;最后一部分的数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段。最后一部分虚线表示的是该 ByteBuf 最多还能扩容多少容量
  2. 以上三段内容是被两个指针给划分出来的,从左到右,依次是读指针(readerIndex)、写指针(writerIndex),然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量
  3. 从 ByteBuf 中每读取一个字节,readerIndex 自增 1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读
  4. 写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增 1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了
  5. ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错

使用 ByteBuf 有以下好处:

  1. 可以有效地区分可读数据和可写数据,读写之间相互没有冲突
  2. Extensibility 可扩展性
    ByteBuf 具有丰富的操作集,可以快速的实现协议的优化。例如,ByteBuf 提供各种操作用于访问无符号值和字符串,以及在缓冲区搜索一定的字节序列。你也可以扩展或包装现有的缓冲类型用来提供方便的访问。自定义缓冲式仍然实现自 ByteBuf 接口,而不是引入一个不兼容的类型
  3. Transparent Zero Copy 透明的零拷贝
    网络应用中需要减少内存拷贝操作次数。你可能有一组缓冲区可以被组合以形成一个完整的消息。网络提供了一种复合缓冲,允许你从现有的任意数的缓冲区创建一个新的缓冲区而无需内存拷贝。例如,一个信息可以由两部分组成:header 和 body。在一个模块化的应用,当消息发送出去时,这两部分可以由不同的模块生产和装配。
    1
    2
    3
    +--------+------+
    | header | body |
    +--------+------+
    如果你使用的是 ByteBuffer ,你必须要创建一个新的大缓存区用来拷贝这两部分到这个新缓存区中。或者,你可以在 NIO做一个收集写操作,但限制你将复合缓冲类型作为 ByteBuffer 的数组而不是一个单一的缓冲区,这样打破了抽象,并且引入了复杂的状态管理。此外,如果你不从 NIO channel 读或写,它是没有用的。
    1
    2
    // 复合类型与组件类型不兼容。
    ByteBuffer[] message = new ByteBuffer[] { header, body };
    通过对比, ByteBuf 不会有警告,因为它是完全可扩展并有一个内置的复合缓冲区。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 复合类型与组件类型是兼容的。
    ByteBuf message = Unpooled.wrappedBuffer(header, body);
    // 因此,你甚至可以通过混合复合类型与普通缓冲区来创建一个复合类型。
    ByteBuf messageWithFooter = Unpooled.wrappedBuffer(message, footer);
    // 由于复合类型仍是 ByteBuf,访问其内容很容易,
    //并且访问方法的行为就像是访问一个单独的缓冲区,
    //即使你想访问的区域是跨多个组件。
    //这里的无符号整数读取位于 body 和 footer
    messageWithFooter.getUnsignedInt(
    messageWithFooter.readableBytes() - footer.readableBytes() - 1);
  4. Automatic Capacity Extension 自动容量扩展
    许多协议定义可变长度的消息,这意味着没有办法确定消息的长度,直到你构建的消息。或者,在计算长度的精确值时,带来了困难和不便。这就像当你建立一个字符串。你经常估计得到的字符串的长度,让 StringBuffer 扩大了其本身的需求。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 一种新的动态缓冲区被创建。在内部,实际缓冲区是被“懒”创建,从而避免潜在的浪费内存空间。
    ByteBuf b = Unpooled.buffer(4);
    // 当第一个执行写尝试,内部指定初始容量 4 的缓冲区被创建
    b.writeByte('1');
    b.writeByte('2');
    b.writeByte('3');
    b.writeByte('4');
    // 当写入的字节数超过初始容量 4 时,
    //内部缓冲区自动分配具有较大的容量
    b.writeByte('5');
  5. Better Performance 更好的性能
    最频繁使用的缓冲区 ByteBuf 的实现是一个非常薄的字节数组包装器(比如,一个字节)。与 ByteBuffer 不同,它没有复杂的边界和索引检查补偿,因此对于 JVM 优化缓冲区的访问更加简单。更多复杂的缓冲区实现是用于拆分或者组合缓存,并且比 ByteBuffer 拥有更好的性能。

粘包拆包和半包合并

基于流的传输比如 TCP/IP, 接收到数据是存在 socket 接收的 buffer 中。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。造成粘包的原因,主要是由于缓冲区的介入,所以需要严格约定去所传输的包的格式——何时开始何时结束。意味着,即使你发送了 2 个独立的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了 3 个数据包,在应用程序中读取数据的时候可能被分成下面的片段:
粘包和半包问题
因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意义并且能够让程序的业务逻辑更好理解的数据。
在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包

  • 半包:如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
  • 粘包:如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

解码器 - ByteToMessageDecoder

入口:io.netty.handler.codec.ByteToMessageDecoder#channelRead

  1. 累加字节流
    累加器累加已读入的字节数,如果超过ByteBuf当前可读入的空间大小,则执行扩容。
    io.netty.handler.codec.ByteToMessageDecoder.Cumulator#cumulate
  2. 调用子类的decode方法进行解析(模板方法)
    io.netty.handler.codec.ByteToMessageDecoder#callDecode
  3. 将子类解析出的ByteBuf向下传播
    io.netty.handler.codec.ByteToMessageDecoder#fireChannelRead(io.netty.channel.ChannelHandlerContext, io.netty.handler.codec.CodecOutputList, int)

Netty中的一些拆箱即用的解码器

如果要自己实现所有协议的拆包无疑是非常麻烦的,实际上 Netty 已经自带了一些开箱即用的拆包器:

  1. 固定长度的拆包器 FixedLengthFrameDecoder
    如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。
  2. 行拆包器 LineBasedFrameDecoder
    从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。
  3. 分隔符拆包器 DelimiterBasedFrameDecoder
    DelimiterBasedFrameDecoder 是行拆包器的通用版本,只不过我们可以自定义分隔符。
  4. 基于长度域拆包器 LengthFieldBasedFrameDecoder
    最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。由于上面三种拆包器比较简单,读者可以自行写出 demo,接下来,我们就结合我们小册的自定义协议,来学习一下如何使用基于长度域的拆包器来拆解我们的数据包。

编码 - MessageToByteEncoder

编码器是一个ChannelHandler,一般是第一个添加到Pipeline内,然后write的最后会将数据进行编码再输出。

  1. 匹配对象
    io.netty.handler.codec.MessageToByteEncoder#acceptOutboundMessage
  2. 内存分配
    io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
  3. 调用子类的编码实现
    io.netty.handler.codec.MessageToByteEncoder#encode
  4. 释放内存
    io.netty.util.ReferenceCountUtil#release(java.lang.Object)
  5. 放到Pipeline里传播
    默认情况下会一直传播到head节点
    io.netty.channel.ChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    io.netty.channel.Channel.Unsafe#write
  6. 输出
    将数据暂存到ByteBuf,将堆内对象转换为堆外内存
    io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    插入写队列
    io.netty.channel.ChannelOutboundBuffer#addMessage
    TODO: 什么时候刷新buffer队列?

自定义数据处理逻辑

基于拦截链模式的事件模型 - pipeline

一个定义良好并具有扩展能力的事件模型是事件驱动开发的必要条件。Netty 具有定义良好的 I/O 事件模型。由于严格的层次结构区分了不同的事件类型,因此 Netty 也允许你在不破坏现有代码的情况下实现自己的事件类型。这是与其他框架相比另一个不同的地方。很多 NIO 框架没有或者仅有有限的事件模型概念;在你试图添加一个新的事件类型的时候常常需要修改已有的代码,或者根本就不允许你进行这种扩展。
在 Netty 中一条连接对应一个 Channel,该 Channel 的所有处理逻辑都在一个 ChannelPipeline 对象内,ChannelPipeline 是一个双向链表结构,在一个 ChannelPipeline 内部一个 ChannelEvent 被一组 ChannelHandler 处理。这个管道是 Intercepting Filter (拦截过滤器)模式的一种高级形式的实现,因此对于一个事件如何被处理以及管道内部处理器间的交互过程,你都将拥有绝对的控制力。例如,你可以定义一个从 socket 读取到数据后的操作:

1
2
3
4
5
6
7
8
9
public class MyReadHandler implements SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
Object message = evt.getMessage();
// Do something with the received message.
...
// And forward the event to the next handler.
ctx.sendUpstream(evt);
}
}

同时你也可以定义一种操作响应其他处理器的写操作请求:

1
2
3
4
5
6
7
8
9
public class MyWriteHandler implements SimpleChannelHandler {
public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt) {
Object message = evt.getMessage();
// Do something with the message to be written.
...
// And forward the event to the next handler.
ctx.sendDownstream(evt);
}
}

ChannelHandler 分为两种:

  • ChannelInboundHandler
    处理读数据逻辑,核心方法是 channelRead。
  • ChannelOutBoundHandler
    处理些数据逻辑,核心方法是 write,在链式处理中总是位于 ChannelInboundHandler 之后。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// inBound,处理读数据的逻辑链
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

// outBound,处理写数据的逻辑链
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});

其执行顺序如下图所示:
pipeline执行顺序

异常处理

netty 由于其异步化的开发方式,以及其事件机制,在异常处理方面就显得异常重要。为了保证连接的高可靠性,许多异常需要静悄悄的忽略,或者在用户态没有感知。
netty 的异常会通过 pipeline 进行传播,所以在任何一层进行处理都是可行的,但编程习惯上,习惯性抛到最外层集中处理。
为了最大限度的区别异常信息,通常会定义大量的异常类,不同的错误会抛出不同的异常。发生异常后,可以根据不同的类型选择断线重连(比如一些二进制协议的编解码紊乱问题),或者调度到其他节点。

Codec 框架

我们可以使用 POJO 代替 ChannelBuffer,从业务逻辑代码中分离协议处理部分总是一个很不错的想法。然而如果一切从零开始便会遭遇到实现上的复杂性。你不得不处理分段的消息。一些协议是多层的(例如构建在其他低层协议之上的协议)。一些协议过于复杂以致难以在一台独立状态机上实现。
因此,一个好的网络应用框架应该提供一种可扩展,可重用,可单元测试并且是多层的 codec 框架,为用户提供易维护的 codec 代码。
Netty 提供了一组构建在其核心模块之上的 codec 实现,这些简单的或者高级的 codec 实现帮你解决了大部分在你进行协议处理开发过程会遇到的问题,无论这些协议是简单的还是复杂的,二进制的或是简单文本的。

SSL / TLS 支持

不同于传统阻塞式的 I/O 实现,在 NIO 模式下支持 SSL 功能是一个艰难的工作。你不能只是简单的包装一下流数据并进行加密或解密工作,你不得不借助于 javax.net.ssl.SSLEngine,SSLEngine 是一个有状态的实现,其复杂性不亚于 SSL 自身。你必须管理所有可能的状态,例如密码套件,密钥协商(或重新协商),证书交换以及认证等。此外,与通常期望情况相反的是 SSLEngine 甚至不是一个绝对的线程安全实现。
在 Netty 内部,SslHandler 封装了所有艰难的细节以及使用 SSLEngine 可 能带来的陷阱。你所做的仅是配置并将该 SslHandler 插入到你的 ChannelPipeline 中。同样 Netty 也允许你实现像 StartTlS 那样所拥有的高级特性,这很容易。

HTTP 实现

HTTP 无 疑是互联网上最受欢迎的协议,并且已经有了一些例如 Servlet 容器这样的 HTTP 实现。因此,为什么 Netty 还要在其核心模块之上构建一套 HTTP 实现?
与现有的 HTTP 实现相比 Netty 的 HTTP 实现是相当与众不同的。在 HTTP 消息的低层交互过程中你将拥有绝对的控制力。这是因为 Netty 的 HTTP 实现只是一些 HTTP codec 和 HTTP 消息类的简单组合,这里不存在任何限制——例如那种被迫选择的线程模型。你可以随心所欲的编写那种可以完全按照你期望的工作方式工作的客户端或服务器端代码。这包括线程模型,连接生命期,快编码,以及所有 HTTP 协议允许你做的,所有的一切,你都将拥有绝对的控制力。
由于这种高度可定制化的特性,你可以开发一个非常高效的 HTTP 服务器,例如:

  • 要求持久化链接以及服务器端推送技术的聊天服务(如,Comet )
  • 需要保持链接直至整个文件下载完成的媒体流服务(如,2 小时长的电影)
  • 需要上传大文件并且没有内存压力的文件服务(如,上传 1GB 文件的请求)
  • 支持大规模混合客户端应用用于连接以万计的第三方异步 web 服务。

WebSockets 实现

WebSockets 允许双向,全双工通信信道,在 TCP socket 中。它被设计为允许一个 Web 浏览器和 Web 服务器之间通过数据流交互。
WebSocket 协议已经被 IETF 列为 RFC 6455 规范。
Netty 已经实现了 WebSocket 和一些老版本的规范:http://netty.io/4.0/api/io/netty/handler/codec/http/websocketx/package-frame.html

Google Protocol Buffer 整合

Google Protocol Buffers 是快速实现一个高效的二进制协议的理想方案。通过使用 ProtobufEncoderProtobufDecoder,你可以把 Google Protocol Buffers 编译器 (protoc) 生成的消息类放入到 Netty 的 codec 实现中。请参考“LocalTime”实例,这个例子也同时显示出开发一个由简单协议定义 的客户及服务端是多么的容易。

性能优化

FastThreadLocal

重写了JDK的ThreadLocal,但是速度更快

Recycle

对象池

单机百万连接

Netty应用级别性能优化

QA

如何使用 Netty

Netty 是 Java 中的一个 NIO 框架:

  1. 易用的 API;
  2. NIO 模型相对 BIO 更高效。
  3. 解决了 Java 原生 NIO 接口存在的一些问题。
    包括粘包半包问题、心跳检测等问题。

ServerBootstrap - 默认情况下Netty服务端会起多个线程?又是什么时候启动这些线程的?

Netty中线程主要用于执行EventLoop的for循环任务,当ServerBootstrap
默认情况下创建2倍CPU核心线程数的线程。
io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
可以看到最终创建了个线程池ThreadPerTaskExecutor
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)

ServerBootstrap - Netty是如何解决JDK的空轮询Bug的?

NioEventLoop

ServerBootstrap - Netty是如何保证异步串行无锁化的?

执行需要保证并发安全的操作时先判断是否是刚开始创建的线程,如果不是则放入一个单线程的线程池中执行。
线程创建位置:SingleThreadEventExecutor的构造方法
判断位置:io.netty.util.concurrent.AbstractEventExecutor#inEventLoop

NioEventLoop - Netty如何检测新连接的接入?

初始化ServerBootstrap时

NioEventLoop - 新连接怎样被注册NioEventLoop线程?

调用bind时会启动一个NioEventLoop线程,用于监听连接请求。

pipeline - Netty如何判断ChannelHandler类型?

ChannelHandler分为Inbound类型和Outbound类型,在Netty中将ChannelHandler添加到Pipeline时会判断这个ChannelHandler的类型,然后设置到一个bool类型的成员变量里,在传播时使用。
io.netty.channel.DefaultChannelHandlerContext#isInbound
io.netty.channel.DefaultChannelHandlerContext#isOutbound

pipeline - 对ChannelHandler的添加会遵循什么样的顺序?

根据Pipeline的传播逻辑可以看出,Inbound类型的ChannelHandler按添加顺序传播,而Outbound类型的ChannelHandler是按逆顺序传播的。

pipeline - 用户手动触发事件传播,不同的触发方式有什么区别?

如果是在Pipeline中间的某个ChannelHandler中调用了read,则就是从这个节点开始往后传播,如果是write,就是从这个节点开始往前传播。

ByteBuf - 内存的类别有哪些?

ByteBuf - 如何减少多线程之间内存分配的竞争?

ByteBuf - 不同大小的内存是如何进行分配的?

ByteBuf - 粘包半包问题是什么

解码器抽象的解码过程?

Netty里面有哪些拆箱即用的解码器?

如何把对象变成字节流,并最终写到socket底层?

如何使用Netty实现长短连接?

长连接是为了复用连接资源,长连接下,多个请求可以使用同一个连接传输数据包。

如何使用Netty实现长短轮询?

长轮询的特点是请求发到服务器上时若没有资源(比如库存),请求会被挂起,直到资源充足后才返回。

参考

  1. Netty 4.x 用户指南
  2. User guide for 4.x(上面这个文档的英文原版)
  3. github - netty / netty
  4. Netty Source Xref (4.0.56.Final)(同上为源码)