# 《分布式IM系统》即时通讯后端服务-第03节:即时通讯后端服务通用ChannelHanler的设计与实现

作者:冰河
星球:http://m6z.cn/6aeFbs (opens new window)
博客:https://binghe.gitcode.host (opens new window)
文章汇总:https://binghe.gitcode.host/md/all/all.html (opens new window)
源码获取地址:https://t.zsxq.com/0dhvFs5oR (opens new window)
课程视频:https://t.zsxq.com/15cCvfYNv (opens new window)

沉淀,成长,突破,帮助他人,成就自我。

  • 本节难度:★★☆☆☆
  • 本节重点:对即时通讯后端服务的通用数据收发ChannelHandler进行设计和实现,掌握Netty中ChannelPipeline的内部结构、事件传播机制、异常传播机制,并能够将ChannelHandler的设计和实现灵活应用到自身实际项目中。
  • 课程视频:https://t.zsxq.com/15cCvfYNv (opens new window)

大家好,我是冰河~~

Netty的性能是非常高的,很多通信框架底层都使用了Netty,像Dubbo框架底层网络通信就使用了Netty,我们在《RPC手撸专栏》中,自己手写的RPC框架底层网络通信也使用了Netty。Netty性能非常高,主要体现在IO线程模型、内存零拷贝、内存池设计、串形化处理读写、高性能序列化协议等方面。

# 一、前言

在前面的章节中,我们已经完成即时通讯后端服务的通用代码设计和实现,并对自定义的编解码器进行了设计和实现。期间,不仅介绍了如何基于Netty自定义编解码器,还介绍了自定义通信协议,Netty中的编码器类和解码器类,让大家对Netty本身提供的编解码器类有个直观的认识。本节,就需要对收发数据的ChannelHandler进行设计和实现。

# 二、本节诉求

本节,对Netty中ChannelPipeline的内部结构、事件传播机制、异常传播机制进行简单的介绍,旨在让大家从整体上了解Netty内部的流水线模式,基于流水线模式实现的事件传播机制和异常传播机制。在此基于上实现自定义的数据收发逻辑。

# 一、ChannelPipeline的内部结构

ChannelPipeline主要是基于流水线模式实现的一个Netty的的编排组件,负责调度Netty中的各种ChannelHandler,对数据的实际处理还是由ChannelHandler完成。ChannelPipeline内部通过双向链表将不同的ChannelHandler整合在一起,如果有Channel请求到来,ChannelPipeline会依次调用链表中的ChannelHandler来处理数据。如图3-1所示。


可以看到,每个Channel会绑定一个ChannelPipeline,每个ChannelPipeline内部会有多个ChannelHandlerContext,所有的ChannelHandlerContext会由一个链表进行连接。

根据数据在Netty中的流向,ChannelPipeline会分为入站ChannelInboundHandler和出站 ChannelOutboundHandler两种处理器。如果数据是由客户端向服务端发送,就叫做出站,如果是由服务端向客户端发送数据,就叫做入站。入站操作是由InboundHandler处理,而出站操作是由OutboundHandler处理。并且入站是需要经过Decoder解码器处理,而出站是由Encoder编码器处理。

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler和ChannelOutboundHandler,整个写数据的入口就是由HeadContext完成的,HeadContext是ChannelInboundHandler调用链的第一步,也是ChannelOutboundHandler调用链的最后一步,当HeadContext接收到数据后,经过ChannelPipeline中各个ChannelHandler的处理后,最终又会由HeadContext向外写数据。

TailContext 只实现了ChannelInboundHandler 接口,它是ChannelInboundHandler调用链的最后异步,会终止InboundHandler的事件,并且TailContext 是ChannelOutboundHandler调用链的第一步,会将事件传递给上一个节点。

# 二、事件传播机制

Netty的ChannelPipeline中包含的处理器可以分为ChannelInboundHandler入站处理器和ChannelOutboundHandler出站处理器。对应的时间也会分成入站(Inbound)事件和出站(Outbound)事件。

这里,我们写几个基于Netty的代码片段来介绍Netty的事件传播机制。

  • 入站(Inbound)事件示例
public class InBoundHandlerExample extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;
    public InBoundHandlerExample(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (flush) {
            ctx.channel().writeAndFlush(msg);
        } else {
            super.channelRead(ctx, msg);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  • 出站(Outbound)事件示例
public class OutBoundHandlerExample extends ChannelOutboundHandlerAdapter {
    private final String ;
    public OutBoundHandlerExample(String name) {
        this.name = name;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
        super.write(ctx, msg, promise);
    }
}
1
2
3
4
5
6
7
8
9
10
11
  • 整合pipeline示例
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
                .addLast(new InBoundHandlerExample("InBoundHandlerA", false))
                .addLast(new InBoundHandlerExample("InBoundHandlerB", false))
                .addLast(new InBoundHandlerExample("InBoundHandlerC", true));
        ch.pipeline()
                .addLast(new OutBoundHandlerExample("OutBoundHandlerA"))
                .addLast(new OutBoundHandlerExample("OutBoundHandlerB"))
                .addLast(new OutBoundHandlerExample("OutBoundHandlerC"));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

相信这些代码小伙伴们都能看懂,那我们就基于这些代码介绍下Netty的事件传播机制。

在上述代码片段中,我们通过Netty的pipeline的addLast()方法分别添加了三个ChannelInboundHandler处理器和三个ChannelOutboundHandler处理器,添加后的事件处理器在ChannelPipeline内的结构如图3-2所示。

# 查看全文

加入冰河技术 (opens new window)知识星球,解锁完整技术文章与完整代码