04月15, 2021

生产实践-浅谈NIO原理及其应用

本文介绍了IO模型对于系统性能的影响以及如何利用Netty开发一个基于NIO的生产级通信组件。

[TOC]

IO模式

BIO/OIO

同步阻塞模式

阻塞IO,阻塞等待数据,直到数据就绪

NIO(New IO/Non-Blocking IO)

非阻塞IO,数据就绪后会通知读取

同步非阻塞模式

AIO

异步非阻塞模式

异步IO,数据就绪后读取并自动回调

术语界定

  • 阻塞/非阻塞 对于读写动作的描述:

    • 【阻塞】数据未就绪 -> 读阻塞 / 缓冲区满 -> 写阻塞
    • 【非阻塞】读写直接返回,等待通知就好
  • 同步/异步 对于数据就绪后读取对象的描述:

    • 【同步】程序自己去读
    • 【异步】 程序只需接收回调

经典面试题:单线程的Redis为何是高性能的?

高性能:高吞吐 + 低延迟 反直觉的问题,多角度分析问题

基于内存 (低延迟)

  • 内存读写速度高于硬盘

多线程的性能消耗 (低延迟/高吞吐)

  • 线程上下文切换
  • 对共享资源的保护(锁)
  • 无限制开辟新线程会让系统吞吐率下降

Redis是“伪单线程”模型 (低延迟)

Improving Redis Performance through Multi-Thread Processing[^alibaba_database_team]

Redis is generally known as a single-process, single-thread model. This is not true. Redis also runs multiple backend threads to perform backend cleaning works, such as cleansing the dirty data and closing file descriptors.

In Redis, the main thread is responsible for the major tasks, including but not limited to: receiving the connections from clients, processing the connection read/write events, parsing requests, processing commands, processing timer events, and synchronizing data.

  • 单线程Redis通常指的是对主要任务的处理
    • 接收数据
    • 处理IO事件
    • 解析请求
    • 处理命令
    • 处理定时事件
  • 多线程处理:
    • 后台清理任务
    • 主从同步

高性能的数据结构 (低延迟)

  • hash
  • ziplist / quicklist
  • skiplist

IO多路复用模型 (高吞吐 C10K问题

单Reactor单线程

IO密集型操作下保证系统高吞吐率 用一个线程处理多个IO流

Redis IO

对可能阻塞的操作步骤采用非阻塞模式

  • BIND/LISTEN
  • ACCEPT (Blocking 尝试建立连接 TCP握手阶段)
  • RECV (Blocking 尝试读取数据 )
  • PARSE
  • GET
  • SEND

在单线程的Redis里,允许内核同时存在多个Accept Socket和RECV Socket

  • LInux select/epoll
  • Windows IOCP (本质是AIO)
  • Mac OS/FreeBSD kqueue

Redis向操作系统内核注册ACCEPT/RECV事件(Redis线程可继续做其他事情),操作系统监听事件,并在事件发生时通知Redis

事件驱动——基于事件回调,epoll会将触发的事件写入事件队列,Redis处理线程只需要一直向事件队列消费并处理即可

Netty里的NIO

IO模型选择

  • BIO阻塞模式不适合IO密集型场景 (高延迟/吞吐小)
  • @Deprecated AIO
    • Linux 支持不好
    • Windows IOCP实现成熟,Win不适合做为服务器环境

Reactor模式[^scalable_in_io]

Reactor是一种开发模式: 注册事件 => 扫描事件 => 触发事件 BIO:Thread-Per-Connection AIO: Proactorer Netty使用了Reactor模式实现NIO:

  • SocketChannel (client) : OP_CONNECT / OP_WRITE / OP_READ
  • ServerSocketChaneel (server): OP_ACCEPT
  • SocketChannel (server) : OP_WRITE / OP_READ

组成部分

  • Reactor

Reactor responds to IO events by dispatching the appropriate handler.

Reactor监听相应事件并分发给相应的Handler

  • Handler

Handlers perform non-blocking actions

Thread-Per-Connection

(https://img.check321.net/20210415/upload_dc1a4fe8c926dc4f24d949399b328452.png)

  • 从读 -> 解码 -> 计算(业务handler) -> 加码 -> 发送都是在同一个请求线程里完成
  • 资源开销大且阻塞

单Reactor单线程

Divide and Conquer

  • "Each task performs an action without blocking" 用分治的思想将处理流程拆解成子,各子任务通过nio做到非阻塞
  • "Non-blocking reads and writes Dispatch tasks associated with sensed IO events" 将各子任务用IO事件关联起来

image.png

  • Reactor与各 Handler同一线程,连接事件OP_ACCEPT会交由Acceptor处理,其余事件由Reactor处理
  • 不能在单一进程使用多核CPU的处理效率
  • 所有事件共享一个线程,所以当有阻塞操作时将会影响全局的处理效率

单Reactor多工作线程

  • "Reactors should quickly trigger handlers " Handler processing slows down Reactor" 用于分发IO事件的Reactor应该是单纯且高效的,避免被业务handler所阻塞
  • Reactor专注于读写事件的管理,其余例如加码/解码/计算,交给额外的线程池处理

image.png

  • 单线程Reactor专注于事件处理,连接事件OP_ACCEPT交给Acceptor,OP_READ/OP_WRITE交给相应Handler
  • Handler交给多线程并行处理
  • 多线程并发也意味着需要额外的性能开销处理线程安全问题

主从Reactor

  • "Use to match CPU and IO rates" 协调CPU与IO使用率,

image.png

  • 单Reactor的问题在于连接事件与其他事件共享线程,也就是说在大连接数或者连接事件消耗资源的情况下(比如SSL连接)会影响其他事件的触发,反之亦然

  • 主从Reactor用主Reactor单独处理连接事件,连接建立后交由从Reactor处理其他事件,职责分明互不影响

核心组件-BECH

Bootstrap

Netty引导类,构建Reactor

io.netty.bootstrap.ServerBootstrap / io.netty.bootstrap.Bootstrap
  • 单线程Reactor
    ServerBootstrap  bootstrap = new ServerBootstrap();
    NioEventLoopGroup singleThreadExecutor = new NioEventLoopGroup(1);

    bootstrap.group(singleThreadExecutor);
  • 多线程Reactor
   ServerBootstrap  bootstrap = new ServerBootstrap();
    NioEventLoopGroup executors = new NioEventLoopGroup();

    bootstrap.group(singleThreadExecutor);
  • 主从多线程Reactor
 ServerBootstrap  bootstrap = new ServerBootstrap();

    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    NioEventLoopGroup worker = new NioEventLoopGroup(16);

    bootstrap.group(boss,worker);

EventLoop

  • Reactor模式中的Selector,用于注册监听Channel上的IO事件
  • io.netty.channel.nio.NioEventLoopGroup:多个Selector的组合

SocketChannel

代表Socket连接,可操作该连接上的连接/关闭/读/写等动作

  • 服务端: io.netty.channel.socket.ServerSocketChannel
  • 客户端:io.netty.channel.Channel

ChannelHandler

用BootStrap为Channel注册处理器链:

bootstrap.group(boss, work)
        .channel(NioServerSocketChannel.class)
        .option(NioChannelOption.SO_BACKLOG,1000) // in case of connection comes in one time.
        .childOption(NioChannelOption.TCP_NODELAY,true) // optimal TCP buffer flushing.
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                        .addLast(new DropletIdleStateHandler())
                        .addLast(new LengthSpliter())
                        .addLast(pacCodecHandler)
                        .addLast(authHandler)
                        .addLast(heartBeatReqHandler)
                        .addLast(authManageHandler)
                        .addLast(operationExecutor,operationReqHandler);
            }
        });
读处理器

io.netty.channel.ChannelInboundHandler

写处理器

io.netty.channel.ChannelOutboundHandler

处理器链

io.netty.channel.ChannelPipeline

 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
执行顺序

注意对于读写处理器不同,处理器在pipline中执行顺序相反

 * For example, let us assume that we created the following pipeline:
 * <pre>
 * {@link ChannelPipeline} p = ...;
 * p.addLast("1", new InboundHandlerA());
 * p.addLast("2", new InboundHandlerB());
 * p.addLast("3", new OutboundHandlerA());
 * p.addLast("4", new OutboundHandlerB());
 * p.addLast("5", new InboundOutboundHandlerX());
  • 对于读: 1 -> 2 -> 5
  • 对于写: 5 -> 4 -> 3

ByteBuf

ByteBuf是Netty里数据的最基本单位,对数据的读写实际上就是对io.netty.buffer.ByteBuf的操作。

构成
*      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0      <=      readerIndex   <=   writerIndex    <=    capacity

利用 readerIndex / writerIndex 两个指针将ByteBuf分成3个部分:

  • 0 ~ readerIndex : 废弃部分 (已读部分)
  • readerIndex ~ writerIndex: 可读部分
  • writerIndex ~ capacity: 可写部分
  • 每读取一个字节, readerIndex ++,当readerIndex == writerIndex -> isReadable=false
  • 每写一个字节, writerIndex++,当writerIndex == capacity -> isWriteable=false (根据maxCapacity扩容)
读写

readxxx()会影响readerIndex指针

 /**
     * Gets a byte at the current {@code readerIndex} and increases
     * the {@code readerIndex} by {@code 1} in this buffer.
     *
     * @throws IndexOutOfBoundsException
     *         if {@code this.readableBytes} is less than {@code 1}
     */
    public abstract byte  readByte();
// 按字节读
byteBuf.readByte()

// 读整型长度
byteBuf.readInt();

// 按长度读取字节数组
final byte[] bytes = new byte[length];

byteBuf.readBytes(bytes);

同样地,writexxx()会影响writerIndex指针

 /**
     * Sets the specified 32-bit integer at the current {@code writerIndex}
     * and increases the {@code writerIndex} by {@code 4} in this buffer.
     * If {@code this.writableBytes} is less than {@code 4}, {@link #ensureWritable(int)}
     * will be called in an attempt to expand capacity to accommodate.
     */
    public abstract ByteBuf writeInt(int value);
    // 写整型长度
   byteBuf.writeInt(Serializer.MAGIC_NUM);

    // 按字节写
   byteBuf.writeByte(Serializer.VER);

    // 写字节数组
   byteBuf.writeBytes(bytes);
内存管理

Netty使用了堆外内存,JVM无法回收该部分内存,所以Netty使用了引用计数法管理 ByteBuf (io.netty.util.ReferenceCounted)

  • io.netty.util.ReferenceCounted#retain():

    创建ByteBuf后引用计数++

  • io.netty.util.ReferenceCounted#release():

    if and only if the reference count became {@code 0} and this object has been deallocated 引用计数 -- , 当计数等于0时内存被回收

基于Netty的Droplet

OpenAPI的困境

优势

  • 成熟的RestAPI实现简单,通用性强

劣势

  • 面向协议重约定,对于接口实现方和接入方有一定要求
  • 实际业务场景导致了很多耗时操作需要异步化接口,使用推拉模式都会给接入方带来额外的接入成本
  • 基于Http协议的RestAPI受制于协议本身的无状态性每次通信需要额外的权限验证以及加密签名步骤
  • 客户定制化需求多,如定制化字段名、定制化功能接口

需求分析

鉴于上述情况,无疑API的替代品要包含如下特点:

  • 开箱即用(接入成本)
  • 事件驱动(异步调用,简单及时的通知机制)
  • 长连接双工通信(通信成本低且灵活)
  • SDK(客户端可定制化)

Droplet(SIM)

整体架构

image.png

模块设计

├── sim
│   ├── CHANGELOG
│   ├── README.md
│   ├── droplet-structure.jpg
│   ├── pom.xml
│   ├── sim-client // SIM客户端
│   ├── sim-client-starter // SIM客户端SpringBoot依赖
│   ├── sim-core // SIM核心包
│   ├── sim-server  // SIM服务端
│   └── sim.iml

SIM-CORE

SIM核心包

├── SimCoreApplication.java
├── annotation
│   ├── DynamicFiled.java
│   ├── OperationHandler.java
│   ├── OperationService.java
│   └── PacHandler.java
├── cmd
│   └── Command.java
├── compiler
│   ├── BaseCompiler.java
│   └── PacCompiler.java
├── conf
│   └── serializer
├── entity
│   ├── BaseEntity.java
│   ├── TaxcoreConstant.java
│   ├── dto
│   ├── enums
│   ├── req
│   └── resp
├── handler
│   ├── LengthSpliter.java
│   ├── PacCodecHandler.java
│   ├── PacDecoder.java
│   └── PacEncoder.java
├── pac
│   ├── AuthReqPac.java
│   ├── AuthRespPac.java
│   ├── CertReqPac.java
│   ├── CertRespPac.java
│   ├── DemoReqPac.java
│   ├── DemoRespPac.java
│   ├── DevicePlateReqPac.java
│   ├── OperationPac.java
│   ├── OperationRespErrorPac.java
│   ├── OperationRespPac.java
│   ├── Pac.java
│   ├── PingPac.java
│   └── PongPac.java
├── serial
│   ├── JSONSerializer.java
│   ├── SerialAlgo.java
│   └── Serializer.java
└── util
    ├── Attributes.java
    ├── CommonUtils.java
    ├── Connection.java
    ├── DropletChannelContext.java
    ├── DropletConstant.java
    └── SIMBeanContext.java
基本通信数据包-PAC

SIM通信的基本数据格式称为Pac,其中核心字段是命令Command,对Pac的不同实现代表SIM不同的通信动作

public abstract class Pac extends BaseEntity {

    private Byte ver;

    public abstract Byte command();

    private Byte encoding;

    public Byte getEncoding() {

        return Optional.ofNullable(encoding)
                .orElse(Encoding.DEFAULT.getRef());

    }
}
指令-Command

SIM用一个字节标识指令类型,包括心跳/业务指令/鉴权动作等等

public interface Command {

    byte PING = -128;

    byte PONG = 127;

    byte DEMO_OPERATION_REQ = 3;

    byte DEMO_OPERATION_RESP = 4;

    // ---------------------------------

    byte PLATE_OPERATION_REQ = 12;

    byte PLATE_OPERATION_RESP = 21;

    byte CERT_OPERATION_REQ = 13;

    byte CERT_OPERATION_RESP = 31;

    byte ERROR_OPERATION_RESP = 77;

    byte AUTH_OPERATION_REQ = 88;

    byte AUTH_OPERATION_RESP = 99;

}
数据包编解码器-PacCompiler
  • 实现SIM出入栈数据的二次编解码

public abstract class BaseCompiler<T> {

    public abstract void encode(ByteBuf byteBuf, T data);

    public abstract T decode(ByteBuf byteBuf);

    protected String buildSerializerByAlgo(Byte algo) {
        return MessageFormat.format(Serializer.SERIALIZER_TEMPLATE, algo);
    }

}
  • 数据传输的两次编解码

  • 二次编解码器实现 net.smarttax.sim.compiler.PacCompiler

    • 编解码协议

      /**
      * @author fyang
      * @title Packet-Compiler.
      * @description sim-protocol:
      * v.1.0.0
      * | magic-number (4bytes) | ver (1byte) | serial-algorithm (1byte) | cmd (1byte) | content-length (4bytes) | ... content |
      *
      *  v.1.0.1
      * | magic-number (4bytes) | ver (1byte) | serial-algorithm (1byte) | cmd (1byte) | encoding (1byte)| content-length (4bytes) | ... content |
      * @date 2020/1/10 16:07
      */
    •   @Override
      public void encode(ByteBuf byteBuf, Pac pac) {
      
        byteBuf.writeInt(Serializer.MAGIC_NUM);
      
        byteBuf.writeByte(Serializer.VER);
      
        byteBuf.writeByte(serializer.algo());
      
        byteBuf.writeByte(pac.command());
      
        // encoding.
        byteBuf.writeByte(pac.getEncoding());
      
      
    final byte[] bytes;
    try {

        bytes = serializer.serialize(pac,ENCODING_HOLDER.get(pac.getEncoding()));
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);

    } catch (UnsupportedEncodingException e) {
        log.error("SIM packet encoding error.",e);
    }

}
```
编解码处理器-PacCodecHandler

net.smarttax.sim.handler.PacCodecHandler

PacHandler
@Slf4j
@ChannelHandler.Sharable
public class PacCodecHandler extends MessageToMessageCodec<ByteBuf, Pac> {

    private final PacCompiler pacCompiler;

    public PacCodecHandler(PacCompiler pacCompiler) {
        this.pacCompiler = pacCompiler;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Pac pac, List<Object> list) throws Exception {
        final ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
        pacCompiler.encode(byteBuf,pac);

        list.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        list.add(pacCompiler.decode(byteBuf));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("PacCodecHandler occurs error.",cause);
        super.exceptionCaught(ctx, cause);
    }

SIM—SERVER

.
├── SimServerApplication.java
├── annotation
├── aspect
├── conf
├── controller
├── entity
├── exception
├── handler
├── mapper
└── service

Server充当业务网关的角色,维持连接并对命令数据包进行解析,代理请求真正的业务服务并通知返回。

Server初始化器-ServerInitializer

net.smarttax.sim.conf.ServerInitializer

@PostConstruct
private void init() {

    boss = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
    work = new NioEventLoopGroup(0,new DefaultThreadFactory("worker"));

    final ServerBootstrap  bootstrap = new ServerBootstrap();

    // for business thread isolation.
    operationExecutor = new UnorderedThreadPoolEventExecutor(16, new DefaultThreadFactory("operation-executor"));

    bootstrap.group(boss, work)
            .channel(NioServerSocketChannel.class)
            .option(NioChannelOption.SO_BACKLOG,1000) // in case of connection comes in one time.
            .childOption(NioChannelOption.TCP_NODELAY,true) // optimal TCP buffer flushing.
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline()
                            .addLast(new DropletIdleStateHandler())
                            .addLast(new LengthSpliter())
                            .addLast(pacCodecHandler)
                            .addLast(authHandler)
                            .addLast(heartBeatReqHandler)
                            .addLast(authManageHandler)
                            .addLast(operationExecutor,operationReqHandler);
                }
            });

    bootstrap.bind(PORT)
            .addListener(f -> {
                if (f.isSuccess()) {
                    log.info("SIM has started on port {} .", PORT);
                }
            });
}
  • bootstrap.group(boss, work)构建主从Reactor模型
  • .channel(NioServerSocketChannel.class)NIO方式
  • .option(NioChannelOption.SO_BACKLOG,1000)防止瞬时大量连接的“惊群效应”
  • .childOption(NioChannelOption.TCP_NODELAY,true)关闭Socket Buffer DELAY,以IO频率代价换取低延迟
  • .childHandler(new ChannelInitializer<NioSocketChannel>() {构建连接处理器执行流水线
  • 启动server监听指定端口
   bootstrap.bind(PORT)
            .addListener(f -> {
                if (f.isSuccess()) {
                    log.info("SIM has started on port {} .", PORT);
                }
            });
心跳检测处理器-DropletIdleStateHandler

net.smarttax.sim.handler.DropletIdleStateHandler

@Slf4j
public class DropletIdleStateHandler extends IdleStateHandler {

    private static final int IDLE_TIME = 90;

    public DropletIdleStateHandler() {
        super(IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt)  {
        log.info("connection [{}] close for no heartbeat.",ctx.channel().toString());
        ctx.channel().close();
    }

}
  • 设置Idle行为(读写/读/写)
  • 监听Idle事件
数据包封帧-LengthSplicer

继承自io.netty.handler.codec.LengthFieldBasedFrameDecoder

根据编码协议长度字段封装数据包,主要委托Netty默认的按长度封帧实现

public class LengthSplicer extends LengthFieldBasedFrameDecoder {

    public LengthSplicer() {
        super(Integer.MAX_VALUE, PacCompiler.SPLITER_OFFSET, PacCompiler.SPLITER_LENGTH);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        return super.decode(ctx, in);
    }
}
数据二次编解码器-PacCodecHandler

继承自io.netty.handler.codec.MessageToMessageCodec

@PacHandler
@Slf4j
@ChannelHandler.Sharable
public class PacCodecHandler extends MessageToMessageCodec<ByteBuf, Pac> {

    private final PacCompiler pacCompiler;

    public PacCodecHandler(PacCompiler pacCompiler) {
        this.pacCompiler = pacCompiler;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Pac pac, List<Object> list) throws Exception {
        final ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
        pacCompiler.encode(byteBuf,pac);

        list.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        list.add(pacCompiler.decode(byteBuf));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("PacCodecHandler occurs error.",cause);
        super.exceptionCaught(ctx, cause);
    }

}
  • MessageToMessageCodec实现自io.netty.channel.ChannelDuplexHandler,是Netty提供的ChannelInboundHandler和ChannelOutboundHandler双向处理器,也就是说会在数据进栈与出栈时同时生效,很适合处理编解码这种业务场景

  • MessageToMessageCodec泛型规定了inbound和outbound类型,让开发者实现二次编解码更方便

  • MessageToMessageCodec在编解码时会自动release掉ByteBuf的引用

 * Be aware that you need to call {@link ReferenceCounted#retain()} on messages that are just passed through if they
 * are of type {@link ReferenceCounted}. This is needed as the {@link MessageToMessageCodec} will call
 * {@link ReferenceCounted#release()} on encoded / decoded messages.  
鉴权包处理器-AuthReqHandler

在连接建立后,客户端首先会发送带有认证信息的鉴权包,鉴权处理器:

  • 校验认证信息
  • 落地连接记录
  • 保存连接实例
  • 返回连接结果数据至客户端/如果认证失败将主动关闭连接
 @Override
    protected void channelRead0(ChannelHandlerContext ctx, AuthReqPac authReqPac) throws Exception {

        final AuthInfo authInfo = Optional.ofNullable( connectionService.queryUserByKey(authReqPac.getAk(), authReqPac.getSk()))
                .orElseThrow(() -> new DropletException(DropletExceptionEnum.AUTHORIZATION_FAIL));

        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();

        // 1. connection recording.
        final Connection connection = Connection.builder()
                .cid(CommonUtils.buildUUID16())
                .ak(authInfo.getAk())
                .authCode(authInfo.getAuthCode())
                .appCode(authInfo.getAppCode())
                .appName(authInfo.getAppName())
                .host(socketAddress.getAddress().getHostAddress())
                .port(socketAddress.getPort())
                .build();

        this.cacheAndRecord(connection,ctx.channel());

        // 2. authorization granting.
        authDetailService.refreshAuthList(authInfo);

        AuthRespPac respPac = AuthRespPac.builder()
                .connection(connection)
                .isConnected(true)
                .build();

        ctx.channel().writeAndFlush(respPac);

        log.info("app [{}] build connection succeed.",authInfo.getAppName());
    }

其中缓存连接实例由net.smarttax.sim.util.DropletChannelContext完成:

内部维护一个连接容器private static final Map<String, Channel> CHANNEL_HOLDER = new ConcurrentHashMap<>(64);方便全局处理连接信息

心跳包处理器-HeartBeatReqHandler

心跳机制

心跳包的产生主要由SIM的客户端驱动发送,所以在服务端只需要在接收到客户端的PING包时相应心跳,注意因为PING包的设计是无状态的,这里可以单例实现

@Slf4j
@PacHandler
@ChannelHandler.Sharable
public class HeartBeatReqHandler extends SimpleChannelInboundHandler<PingPac> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PingPac msg) throws Exception {
        ctx.writeAndFlush(PongPac.INSTANCE());
    }
}
鉴权管理处理器-AuthManageHandler
  • 判断当前连接是否有效,若无效则关闭连接
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    if(!DropletChannelContext.isConnected(ctx.channel())){
        log.info("connection [{}] hasn't authorize yet.",ctx.channel().toString());
        ctx.channel().close();
        return;
    }

    ctx.pipeline().remove(this);
    super.channelRead(ctx, msg);
}
  • 因为TCP面向连接,连接建立成功后不用每次接收消息时重复鉴权,所以在判定连接有效后需要删除鉴权管理处理器
   @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
        log.info("Authorization processor has been removed.");
    }

这里监听handlerRemoved事件可以验证鉴权管理处理器被移除

#####命令操作处理器-OperationReqHandler

最终有效的消息会被传递到该处理器解析真正的业务命令

@PacHandler
@Slf4j
@ChannelHandler.Sharable
public class OperationReqHandler extends SimpleChannelInboundHandler<OperationPac> {

    private final OperationHandlerManager optManager;

    public OperationReqHandler(OperationHandlerManager optManager) {
        this.optManager = optManager;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, OperationPac operationPac) {

        Connection conn = DropletChannelContext.getConn(ctx.channel());
        operationPac.setAuthCode(conn.getAuthCode());
        operationPac.setCurrentConnection(conn);

        OperationRespPac resp;
        try{
            resp = optManager.invoke(operationPac);
        }catch (DropletException e){

            log.error("Droplet proccesor chain occurs an error: {}",e.getMsg());
            // catch exception from processor chain.
            resp = new OperationRespPac();
            resp.setOperationStatus(OperationStatus.FAIL);
            resp.setOperation(operationPac.opt());
            resp.setSid(operationPac.getSid());
            resp.setMsg(e.getMessage());
        }

        ctx.channel().writeAndFlush(resp);

    }
}    

在获取连接后将数据包与命令委托给net.smarttax.sim.service.manager.OperationHandlerManager处理

  • 因为真正的业务处理是典型的IO密集型操作,所以这里给OperationHandler专门的线程池以隔离NIO工作线程组EventLoopGroup
 // for business thread isolation.
        operationExecutor = new UnorderedThreadPoolEventExecutor(16, new DefaultThreadFactory("operation-executor"));
        ...
  .addLast(operationExecutor,operationReqHandler);
命令处理管理器-OperationHandlerManager

这里是DropletServer整个业务处理的入口,也是Socket连接的分界点; 设计时出于连接处理与业务处理的解耦所以在这里分层处理最终的业务数据包; manager实际上也是个chooser充当业务通道的策略选择器;

核心逻辑:

  • 封装通道处理容器:

本质是: Map<业务通道,Map<方法名称,方法签名>>

 private final Map<String,Map<String,Class<?>[]>> METHODS_SIGNATURE_HOLDER = new ConcurrentHashMap<>(32);
   @PostConstruct
    @Override
    public void init(){

        Arrays.stream(OperationChannel.values())
                .forEach(ch -> {
                    OperationService<?,?> optService = ctx.dispatch(ch);
                    if(optService == null){
                        return;
                    }
                    final Class<?> targetClazz = this.getTargetClazz(optService);
                    METHODS_SIGNATURE_HOLDER.put(ch.toString()
                            ,Stream.of(targetClazz.getDeclaredMethods())
                            .filter(m -> ! CommonUtils.isInterfaceMethod(m.getModifiers()))
                            .collect(collectingAndThen(toMap(Method::getName,Method::getParameterTypes)
                                    ,ImmutableMap::copyOf)));
                });

    }
  • 业务操作管理器执行

    • 解析命令包
    • 选取业务通道
    • 反射调用目标方法
@Override
public OperationRespPac invoke(OperationPac optPac){

    final Operation opt = optPac.opt();
    final OperationService<?,?> targetService = ctx.dispatch(opt.getChannel());// TODO configure on runtime.

    try {
        Class<?> proxyClazz = this.getTargetClazz(targetService);
        Class<?>[] paramClazz = METHODS_SIGNATURE_HOLDER
                .get(opt.getChannel().toString())
                .get(opt.getDesc());

       return  (OperationRespPac) proxyClazz.getMethod(opt.getDesc(),paramClazz)
                .invoke(targetService, optPac);

    } catch (Exception e) {
        log.error("OperationHandlerManager proxy invoker occurs an error.",e);
        return this.buildErrorResp(optPac,e);
    }

}
DropletContextProcessorManager与DropletBaseProcessor

Netty在连接阶段提供的ChannelHandlerPipline分层处理很好用,所以在业务阶段基于SpringAOP加入业务处理链DropletBaseProcessor,本质上是一个策略 + 代理 +责任链模式的实现

  • DropletContextProcessorManager 基于执行时机TriggerOn管理维护DropletBaseProcessor处理链
private Map<TriggerOn, List<DropletBaseProcessor>> CONTEXT_PROCESSOR_HOLDER;

@Override
public void init() {

    if (CollectionUtils.isEmpty(CONTEXT_PROCESSOR_HOLDER)) {
        return;
    }

    CONTEXT_PROCESSOR_HOLDER.forEach((t, p) -> {
        // 处理器执行链排序
        p.sort(Comparator.comparing(DropletBaseProcessor::getOrder));
    });

}
  • 在执行时会排序执行处理链,并根据具体的处理器实现达到不同的效果
  @Override
    public void invoke(TriggerOn triggerOn,Object param) throws DropletException {

        List<DropletBaseProcessor> processorChain = CONTEXT_PROCESSOR_HOLDER.get(triggerOn);

        if(CollectionUtils.isEmpty(processorChain)){
            return;
        }
        // 按序执行
        for (DropletBaseProcessor processor : processorChain
        ) {
            try {
                processor.invoke(param);
            } catch (Exception e) {
                log.error("Droplet-Context processor processing occurs an error: {}", e.getMessage());
                if (processor.isSuspend()) {
                    // 响应中断
                    processor.doSuspend(e);
                }
            }

        }
    }
  • 业务操作权限处理器-OperationAuthorizationProcessor

DropletBaseProcessor的典型实现:

@DropletProcessor(triggerOn = TriggerOn.BEFORE,order = 0,suspend = true)
public class OperationAuthorizationProcessor extends DropletBaseProcessor {

    private final AuthDetailService authDetailService;

    public OperationAuthorizationProcessor(AuthDetailService authDetailService) {
        this.authDetailService = authDetailService;
    }

    @Override
    public void doSuspend(Exception e) throws DropletException {
        if(e instanceof DropletException){
           throw (DropletException) e;
        }
    }

    @Override
    public void invoke(Object object) {

           ...
       }
  • 执行时机: 被代理方法执行前
  • 执行顺序: 第0位
  • 中断标记: true,标识该处理器如果异常会中断整条处理器链的执行
最终的多通道业务实现-OperationService

image.png

各种业务功能具体代码:

@OperationHandler(optChannel = OperationChannel.DEVICE_PLATE)
@Slf4j
public class DevicePlateServiceImpl extends DevicePlateOperationBaseService{

    private final TaxCoreRemoteClient taxCoreRemoteClient;

    private final TaxCoreESignRemoteClient taxCoreESignRemoteClient;

    public DevicePlateServiceImpl(TaxCoreRemoteClient taxCoreRemoteClient, TaxCoreESignRemoteClient taxCoreESignRemoteClient) {
        this.taxCoreRemoteClient = taxCoreRemoteClient;
        this.taxCoreESignRemoteClient = taxCoreESignRemoteClient;
    }

    @Override
    public OperationRespPac blanks(DevicePlateReqPac req) {

        // TODO Request parser adapter.

        final BlanksQueryReq query = BlanksQueryReq.builder()
                .invType(Optional.ofNullable(req.getInvoiceType()).orElse(InvoiceType.ALL).getCode())
                .build();

        final TaxCoreReqDTO<BlanksQueryReq> reqDTO = new TaxCoreReqDTO<>(super.buildBaseQuery(req),query);
        reqDTO.setData(query);
        reqDTO.setQuery(super.buildBaseQuery(req));

        final OperationRespPromise promise = new OperationRespPromise();
        OperationPromiseContext.register(req.getSid(),promise);

        final TaxCoreRespDTO<Boolean> respDTO = taxCoreRemoteClient.listBlanks(reqDTO);

        return pendingResp(req,Operation.DEVICE_PLATE_BLANKS,promise,respDTO, BlanksQueryResp.class);
    }

SIM-CLIENT

.
├── SimClientApplication.java
├── annotation
│   ├── EnableSmarttaxClient.java
│   └── SmarttaxRespListener.java
├── command
│   ├── AuthorizationCommand.java
│   ├── CommandManager.java
│   ├── ConsoleCommand.java
│   ├── DefaultCommand.java
│   ├── DemoBlanksCommand.java
│   ├── DemoEInvoiceCommand.java
│   ├── DemoEInvoiceQueryCommand.java
│   ├── PlateBlanksCommand.java
│   ├── PlateInvoiceQueryCommand.java
│   ├── PlateInvoiceReversingCommand.java
│   ├── PlateInvoicingCommand.java
│   └── ShutDownCommand.java
├── conf
│   ├── ClientInitializer.java
│   ├── DropletConf.java
│   └── SmarttaxListenerRegister.java
├── facade
│   ├── SmarttaxAsyncService.java
│   ├── SmarttaxClient.java
│   └── SmarttaxOperationListener.java
└── handler
    ├── AuthRespHandler.java
    ├── DropletIdleStateHandler.java
    ├── HeartBeatHandler.java
    ├── OperationErrorRespHandler.java
    ├── OperationRespHandler.java
    ├── OperationRespHandlerManager.java
    ├── OperationRespManager.java
    └── PongRespHandler.java
客户端初始化器-ClientInitializer
 @PostConstruct
    private void init() {

        loopGroup = new NioEventLoopGroup();

        bootstrap = new Bootstrap();
        bootstrap.group(loopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new DropletIdleStateHandler(ctx.getBean(ClientInitializer.class),dropletConf))
                                .addLast(new LengthSplicer())
                                .addLast(pacCodecHandler)
                                .addLast(authRespHandler)
                                .addLast(operationRespHandler)
                                .addLast(operationErrorRespHandler)
                                .addLast(new HeartBeatHandler());
                    }
                });

        this.connectWithRetry(dropletConf.getRetry());
    }

与服务端构建的差异:

  • bootstrap = new Bootstrap(); 客户端与服务端的差异,客户端只监听 OP_CONNECT / OP_WRITE / OP_READ 事件
  • loopGroup = new NioEventLoopGroup(); 只有一个NioEventLoopGroup,客户端管理连接即可,不需要监听建立连接的OP_ACCEPT事件
  • 在实际生产过程中希望在网络波动或远程服务器停机升级时客户端可以自动检测并重连
public void connectWithRetry(int retry) {
     bootstrap.connect(dropletConf.getEndpoint(),dropletConf.getPort())
            .addListener(f -> {
                if(f.isSuccess()){
                    log.info("connection succeeded.");
                    Channel channel = ((ChannelFuture) f).channel();

                    smarttaxClient.refreshChannel(channel);

                }else if(retry == 0){
                    connectWithRetry(dropletConf.getRetry());
                }else {
                    // current round.
                    int currentRound = (dropletConf.getRetry() - retry) + 1;
                    // current delay.
                    int delay =  1 << currentRound;

                    log.info("retry to get connection, round: [{}].",currentRound);
                    bootstrap.config()
                            .group()
                            .schedule(() -> connectWithRetry(retry - 1),delay, TimeUnit.SECONDS);
                }
            });

}
  • 在连接后的异步监听里对连接失败的动作进行指数退避间隔的重连(保护资源与及时响应的取舍)
  • 启用bootstrap的定时任务schedule递归调用,delay会随着currentRound的增长呈指数增长
业务操作响应处理类-OperationRespHandler

接收到业务操作后委托给net.smarttax.sim.handler.OperationRespHandlerManager处理

@PacHandler
@Slf4j
@ChannelHandler.Sharable
public class OperationRespHandler extends SimpleChannelInboundHandler<OperationRespPac> {

    private final OperationRespManager handlerManager;

    public OperationRespHandler(OperationRespManager handlerManager) {
        this.handlerManager = handlerManager;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, OperationRespPac operationPac)  {
        handlerManager.invoke(operationPac);
    }
}
业务响应处理管理器-OperationRespHandlerManager

其核心是将最终结果委托给用户的net.smarttax.sim.facade.SmarttaxOperationListener监听器实现。

 @Override
    public void invoke(OperationRespPac respPac) {

        if(CollectionUtils.isEmpty(listeners)){
            log.info("Having no listener been registered yet.");
            return;
        }

        listeners.forEach((name, listener) -> {
            if (!(listener instanceof net.smarttax.sim.facade.SmarttaxOperationListener)) {
                log.warn("Listener implementation: net.smarttax.sim.facade.SmarttaxOperationListener");
                return;
            }

            try {
                final Method targetMethod = listener.getClass().getDeclaredMethod(respPac.getOperation().getDesc(), RespDTO.class);
                targetMethod.invoke(listener,buildRespDTO(respPac, targetMethod));

            } catch (NoSuchMethodException e) {
                log.warn("No such method reference: [{}]", respPac.getOperation().getDesc());

            } catch (Exception e) {
                log.error("Listener invoking failed.", e);

            }

        });
    }
客户端实例-SmarttaxClient
  • 持有连接实例、配置对象
  • 封装业务操作方法
@Slf4j
public final class SmarttaxClient implements SmarttaxAsyncService {

    private Channel channel;

    private final DropletConf dropletConf;

    public SmarttaxClient(DropletConf dropletConf) {
        this.dropletConf = dropletConf;
    }

    public void refreshChannel(Channel channel)  {
        this.channel = channel;
        this.auth(channel);
    }
    ...
}
业务操作响应监听接口-SmarttaxOperationListener

抽象监听方法与参数类型用于给用户实现:

public interface SmarttaxOperationListener {

     /**
     * @Description: 查询空白发票
     * @Param:
     * @return:
     * @Author: fyang
     */
     void blanks(RespDTO<List<InvoiceBlankEntity>> results);

     /**
     * @Description: 发票修复
     * @Param:
     * @return:
     * @Author: fyang
     */
     void eInvoiceQuery(RespDTO<List<InvoiceQueryEntity>> results);

    ...
心跳包处理器-HeartBeatHandler
  • 监听连接建立成功的channelActive事件,在建立连接后将心跳包发送的任务委托给定时任务,并每隔30s发送一次,对应服务端的Idle值是90s。
@Slf4j
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private static final int HEARTBEAT_INTERVAL = 30;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.heartbeat(ctx);
        super.channelActive(ctx);
    }

    private void heartbeat(ChannelHandlerContext ctx){

        ctx.executor().schedule(() -> {
            if(ctx.channel().isActive()){
                ctx.channel().writeAndFlush(PingPac.INSTANCE());
                heartbeat(ctx);
            }
        },HEARTBEAT_INTERVAL, TimeUnit.SECONDS);

    }
}

SIM-CLIENT-STARTER

SpringBoot支持

.
└── conf
    ├── DropletClientAutoConfiguration.java
    └── DropletClientImporter.java
SpringBoot自动配置
@Configuration
@ConditionalOnProperty(name = "smarttax.droplet.enable",havingValue = "true")
@ConditionalOnResource(resources = "META-INF/spring.factories")
@ComponentScan(basePackages = {"net.smarttax.sim"})
public class DropletClientAutoConfiguration implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Bean
    public SmarttaxClient smarttaxClient()  {

        final DropletConf conf = applicationContext.getBean(DropletConf.class);

        return new SmarttaxClient(conf);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
spring.factories

Spring版本的SPI,用于向Bean容器注册Jar包外的Bean

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
net.smarttax.sim.starter.conf.DropletClientAutoConfiguration

数据的传输

数据是以二进制形式在网络中分层传输的,每一层对数据有不同的"封装处理":

image.png

  • 二层将数据分成帧(frame) image.png
  • 网络层——数据包(packet) image.png
  • 传输层——段(fragment)

image.png

  • 应用层——各自实现

image.png

一次编解码

对于应用层的下层协议来说,任何应用层面向业务的数据分包方式都是其不关心的,比如在应用层协议中分别传输字符串"Netty" /"In"/ "Action",数据包在TCP流式传输过程中可能会被拆分成"Nett"/"yInA"/"ction",也就是常说的"粘包/半包"。

影响数据"粘包/半包"的原因:

  • TCP Socket缓冲区 [^tcp_and_buffer]

在TCP Socket的编程中,发送/接收数据由3个队列共同合作完成:

  • SendBuffer: 写缓冲区
  • RecvBuffer: 读缓冲区
  • Deliverd: 应用程序已读缓冲区

![image.png](https://img.check321.net/20210415/upload_44f421b5e2916ebc0fe0c3edc03a27d1.png

也就是说即使调用Socket写操作后,数据不会马上发送,而是先入SendBuffer等待刷入网卡真正发送。反之亦然,对于读操作网卡读到数据后会先入RecvBuffer满足一定条件后才会读入应用程序。

当一个完整应用层数据包 > SendBuffer时,该数据会被拆分发送,对于接收端即发生“半包”; 当一个完整应用层数据包 < SendBuffer时,为了减少数据传输次数,Socket会将多个数据包合并发送,对于接收端即发生“粘包”; image.png

  • MTU(maximum transmission unit) 即 网络传输的最大传输单元^what_is_mtu

    • 查看MTU

      ➜ ~ networksetup -getMTU en0 ➜ ~ Active MTU: 1280 (Current Setting: 1280) image.png

- MTU是网络层概念,作用于IP路由时,整个网络层输过程中的路由设备会检查下一跳设备的MTU,发现当前数据包大小超过MTU时会对其进行分片(fragmentation)传输。
  • MSS(maximum segment size) 最大分段容量 ^what_is_mss

    • MSS是传输层概念,即传输层将数据包分段的最大容量,每个数据包包含了网络层的IP头与传输层的TCP头信息,故可从MTU推出MSS:

    • MSS = MTU - (IPHeader + TCPHeader)

    • 与MTU不同的是超过MSS的数据包将直接被丢弃

      image.png

Netty中对“一次编解码”的支持

Netty本身自带了很多开箱即用的一次编解码器,实现于io.netty.handler.codec.ByteToMessageDecoder

  • io.netty.handler.codec.FixedLengthFrameDecoder

定长解码——按照一个统一的长度编解码,这种方式实现简单但是对使用场景要求很高,应用层每个数据包无论大小都会占用相同的长度资源。

/**
 * A decoder that splits the received {@link ByteBuf}s by the fixed number
 * of bytes. For example, if you received the following four fragmented packets:
 * <pre>
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
 * </pre>
 * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
 * following three packets with the fixed length:
 * <pre>
 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+
 * </pre>
 */
  • io.netty.handler.codec.DelimiterBasedFrameDecoder

分隔符解码——按照指定的分割符号进行解码,可以按需控制传输资源大小但是不适用于传输内容比较开放的场景。

 * <pre>
 * +--------------+
 * | ABC\nDEF\r\n |
 * +--------------+
 * </pre>
 * a {@link DelimiterBasedFrameDecoder}({@link Delimiters#lineDelimiter() Delimiters.lineDelimiter()})
 * will choose {@code '\n'} as the first delimiter and produce two frames:
 * <pre>
 * +-----+-----+
 * | ABC | DEF |
 * +-----+-----+
 * </pre>
 * rather than incorrectly choosing {@code '\r\n'} as the first delimiter:
 * <pre>
 * +----------+
 * | ABC\nDEF |
 * +----------+
 * </pre>
 */
  • io.netty.handler.codec.LengthFieldBasedFrameDecoder

长度字段解码——该方式在传输协议中规定字段用来标识内容长度,该种方式场景通用性很强,所以也是一般会选择的编解码方式。

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+

以及等等对成熟协议的支持编解码器:

image.png

二次编解码

解决了“粘包/半包”问题后,数据在高级语言中面向的是“对象”,网络传输面向的是字节流,故二次编解码即是对象序列化/反序列化的处理。

  • JSON
  • Protobuf
  • Marshalling
  • Hessian

...

各有利弊,根据实际情况考虑。

心跳机制

连接假死
  • 应对复杂的公网网络环境,保证连接的可用性(假死连接阻塞客户端的收发)
  • 管理控制合理的资源占用 (大量假死连接占用维持socket)
心跳探活
  • 客户端周期性向服务端发送心跳数据包,服务端在收到心跳包后返回客户端心跳确认从而完成一次探活过程
  • 客户端与服务端设置一个探活的容忍阈值(Idle),超过Idle时间没有收到心跳包即判断为连接假死
心跳Idle
  • 客户端Idle: 重新连接 (最大努力连接可用)
  • 服务端Idle:关闭该连接 (减少服务端资源占用)
  • 考虑到公网环境的复杂性(网络波动等),Idle时间比心跳周期长(如MQTT协议的默认Idle时间是心跳的1.5倍)

总结

  • 认识IO模型,以及IO模型是如何决定系统服务的性能
  • 实现NIO的Reactor模型以及其分类,单Reactor单线程 , 单Reactor多线程 , 主从Reactor多线程
  • Netty的基础
  • IM类服务的基础组件有哪些
  • 从实际角度出发分析问题,尝试用理论指导实践,用实践检验理论

  1. 美团技术-NIO

[^scalable_in_io]: Scalable IO In Java

[^alibaba_database_team]: 5. Improving Redis Performance through Multi-Thread Processing

[^tcp_and_buffer]: JavaTCP/IP Socket编程- 6.1 TCP和缓冲

本文链接:https://check321.net/post/from_nio_to_netty_00.html

-- EOF --

Comments

请在后台配置评论类型和相关的值。