# 《并发设计模式》第56章-流水线模式-流水线模式在Netty中的应用

作者:冰河
星球:http://m6z.cn/6aeFbs (opens new window)
博客:https://binghe.gitcode.host (opens new window)
文章汇总:https://binghe.gitcode.host/md/all/all.html (opens new window)
源码获取地址:https://t.zsxq.com/0dhvFs5oR (opens new window)

沉淀,成长,突破,帮助他人,成就自我。

  • 本章难度:★★☆☆☆
  • 本章重点:了解流水线模式的核心原理与使用场景,初步掌握流水线模式的应用场景,掌握流水线模式在Netty中的应用,能够初步结合自身项目实际场景思考如何将流水线模式灵活应用到自身实际项目中。

大家好,我是冰河~~

在Netty框架的设计中,使用了流水线模式,可以这么说,Netty中的ChannelPipeline是流水线模式的典型应用。小伙伴们彻底搞懂了Netty中的ChannelPipeline机制,Netty也就掌握一大半了。

# 一、ChannelPipeline的内部结构

ChannelPipeline主要是基于流水线模式实现的一个Netty的的编排组件,负责调度Netty中的各种ChannelHandler,对数据的实际处理还是由ChannelHandler完成。ChannelPipeline内部通过双向链表将不同的ChannelHandler整合在一起,如果有Channel请求到来,ChannelPipeline会依次调用链表中的ChannelHandler来处理数据。如图56-1所示。


可以看到,每个Channel会绑定一个ChannelPipeline,每个ChannelPipeline内部会有多个ChannelHandlerContext,所有的ChannelHandlerContext会由一个链表进行连接。

根据数据在Netty中的流向,ChannelPipeline会分为入站ChannelInboundHandler和出站 ChannelOutboundHandler两种处理器。如果数据是由客户端向服务端发送,就叫做出站,如果是由服务端向客户端发送数据,就叫做入站。入站操作是由InboundHandler处理,而出站操作是由OutboundHandler处理。并且入站是需要经过Decoder解码器处理,而出站是由Encoder编码器处理。

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler和ChannelOutboundHandler,整个写数据的入口就是由HeadContext完成的,HeadContext是ChannelInboundHandler调用链的第一步,也是ChannelOutboundHandler调用链的最后一步,当HeadContext接收到数据后,经过ChannelPipeline中各个ChannelHandler的处理后,最终又会由HeadContext向外写数据。

TailContext 只实现了ChannelInboundHandler 接口,它是ChannelInboundHandler调用链的最后异步,会终止InboundHandler的事件,并且TailContext 是ChannelOutboundHandler调用链的第一步,会将事件传递给上一个节点。

# 二、事件传播机制

Netty的ChannelPipeline中包含的处理器可以分为ChannelInboundHandler入站处理器和ChannelOutboundHandler出站处理器。对应的时间也会分成入站(Inbound)事件和出站(Outbound)事件。

这里,我们写几个基于Netty的代码片段来介绍Netty的事件传播机制。

  • 入站(Inbound)事件示例
public class InBoundHandlerExample extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;
    public InBoundHandlerExample(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (flush) {
            ctx.channel().writeAndFlush(msg);
        } else {
            super.channelRead(ctx, msg);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  • 出站(Outbound)事件示例
public class OutBoundHandlerExample extends ChannelOutboundHandlerAdapter {
    private final String ;
    public OutBoundHandlerExample(String name) {
        this.name = name;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
        super.write(ctx, msg, promise);
    }
}
1
2
3
4
5
6
7
8
9
10
11
  • 整合pipeline示例
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
                .addLast(new InBoundHandlerExample("InBoundHandlerA", false))
                .addLast(new InBoundHandlerExample("InBoundHandlerB", false))
                .addLast(new InBoundHandlerExample("InBoundHandlerC", true));
        ch.pipeline()
                .addLast(new OutBoundHandlerExample("OutBoundHandlerA"))
                .addLast(new OutBoundHandlerExample("OutBoundHandlerB"))
                .addLast(new OutBoundHandlerExample("OutBoundHandlerC"));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

相信这些代码小伙伴们都能看懂,那我们就基于这些代码介绍下Netty的事件传播机制。

在上述代码片段中,我们通过Netty的pipeline的addLast()方法分别添加了三个ChannelInboundHandler处理器和三个ChannelOutboundHandler处理器,添加后的事件处理器在ChannelPipeline内的结构如图56-2所示。


可以看到,当客户端向服务端发送请求时,就会触发InBoundHandlerExample中的channelRead()事件,当InBoundHandlerExample处理完后,在InBoundHandlerC中会调用writeAndFlush()方法向客户端写数据,此时就会触发OutBoundHandlerExample的write()事件。

并且在Netty中,Inbound事件和 Outbound事件的传播方向是不一样的。Inbound事件是由Head向Tail传播,而Outbound事件是由Tail向Head传播。

# 三、异常传播机制

# 查看全文

加入冰河技术 (opens new window)知识星球,解锁完整技术文章与完整代码