博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty源码之ChannelPipeline和ChannelHandlerContext
阅读量:6334 次
发布时间:2019-06-22

本文共 16886 字,大约阅读时间需要 56 分钟。

hot3.png

ChannelHandlerContext简介

ChannelHandlerContext的主要功能是使ChannelHandler能够与所属ChannelPipeline中的其他ChannelHandler进行交互,同时,使ChannelHandler可以在上游或下游传递事件,执行I/O操作,动态修改流水线或使用AttributeKeys存储ChannelHandler特有的信息等等。

先简单看看类图:

AbstractChannelHandlerContext为ChannelHandlerContext提供了很多默认实现;实现ChannelInboundInvoker和ChannelOutboundInvoker使得context可以传播入站事件和出站事件,实现ResourceLeakHint使得ChannelHandlerContext具备内存泄漏检测的能力。下面开始结合源码分析ChannelPipeline和ChannelHandlerContext的实现原理。

管理handler状态

AbstractChannelHandlerContext中使用handlerState来标识ChannelHandler的状态。

private volatile int handlerState = INIT;    /**     * ChannelHandler的handlerAdded(ChannelHandlerContext)方法即将被调用.     */    private static final int ADD_PENDING = 1;    /**     * ChannelHandler的handlerAdded(ChannelHandlerContext)方法调用完成     */    private static final int ADD_COMPLETE = 2;    /**     * ChannelHandler的handlerRemoved(ChannelHandlerContext)方法调用完成     */    private static final int REMOVE_COMPLETE = 3;    /**     * 初始状态     */    private static final int INIT = 0;

INIT:初始状态;

ADD_PENDING :用于标识handler添加的中间状态,表示handler已添加到pipeline链表上,但handlerAdded仍未调用。之所以有这个状态,是因为当ordered为false时,此状态的handler也可以处理事;ordered为true,意味着EventExecutor需要根据pipeline中已经添加的handler顺序处理事件;

ADD_COMPLETE :ChannelHandler的handlerAdded已经被调用,此状态的handler可以处理事件;

REMOVE_COMPLETE:ChannelHandler的handlerRemoved已经被调用,此状态的handler不能再处理事件。

状态转化:

INIT->ADD_PENDING 

方法setAddPending用于将handlerState从INIT转化为ADD_PENDING ,且setAddPending只会在addLast,addFirst,addAfter,addBefore等添加handler的方法中才可能被调用。进入源码:

final void setAddPending() {        boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);        assert updated;    }

以下两种情况下会调用setAddPending将handlerState设置为ADD_PENDING:

  • Channel未注册,将callHandlerAdded0封装在PendingHandlerCallback中待Channel注册后执行;
  • 当前线程不是channel注册时绑定的EventLoop,将callHandlerAdded0封装为Runnable任务提交给当前channel的EventLoop执行。

下面以DefaultChannelPipeline中addLast源码来分析:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            newCtx = newContext(group, filterName(name, handler), handler);            addLast0(newCtx);            if (!registered) {                newCtx.setAddPending();                callHandlerCallbackLater(newCtx, true);                return this;            }            EventExecutor executor = newCtx.executor();            if (!executor.inEventLoop()) {                newCtx.setAddPending();                executor.execute(new Runnable() {                    @Override                    public void run() {                        callHandlerAdded0(newCtx);                    }                });                return this;            }        }        callHandlerAdded0(newCtx);        return this;    }

addLast的作用是将自定义handler添加到pipeline双向链表尾部,但是它不保证handler的handlerAdded方法会被立即调用。由源码可知,当registered为false时,调用setAddPending将handlerState设置为为ADD_PENDING,同时调用callHandlerCallbackLater封装task,用于Channel注册完成后调用callHandlerAdded0,在callHandlerAdded0中调用handlerAdded,并将handlerState设置为ADD_COMPLETE;如果Channel已经注册,即registered为true,但当前executor 不是channel注册时绑定的EventLoop,调用setAddPending将handlerState设置为为ADD_PENDING,同时将callHandlerAdded0封装在Runnable任务中提交给当前channel的EventLoop执行。

当Channel未注册时,使用PendingHandlerCallback来封装Channel注册完成后需要执行的任务,下面看实现:

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {        assert !registered;        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);        PendingHandlerCallback pending = pendingHandlerCallbackHead;        if (pending == null) {            pendingHandlerCallbackHead = task;        } else {            while (pending.next != null) {                pending = pending.next;            }            pending.next = task;        }    }

由callHandlerCallbackLater方法可知,ChannelPipeline维护了一个链表,使用PendingHandlerCallback作为节点,PendingHandlerCallback同时也是一个Runnable,可直接提交到EventLoop执行。pendingHandlerCallbackHead用于保存head节点的引用,方便后续遍历链表。

PendingHandlerCallback中维护了context和next节点的引用,并提供了抽象方法execute()供子类实现,主要PendingHandlerAddedTask和PendingHandlerRemovedTask两个实现,分别用于添加handler和移除handler。

下面以PendingHandlerAddedTask源码为例:

private abstract static class PendingHandlerCallback implements Runnable {        final AbstractChannelHandlerContext ctx;        PendingHandlerCallback next;        PendingHandlerCallback(AbstractChannelHandlerContext ctx) {            this.ctx = ctx;        }        abstract void execute();    }    private final class PendingHandlerAddedTask extends PendingHandlerCallback {        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {            super(ctx);        }        @Override        public void run() {            callHandlerAdded0(ctx);        }        @Override        void execute() {            EventExecutor executor = ctx.executor();            if (executor.inEventLoop()) {                callHandlerAdded0(ctx);            } else {                try {                    executor.execute(this);                } catch (RejectedExecutionException e) {                    if (logger.isWarnEnabled()) {                        logger.warn(                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",                                executor, ctx.name(), e);                    }                    remove0(ctx);                    ctx.setRemoved();                }            }        }    }

源码很简单,主要说明下execute()的实现,同样需要保证callHandlerAdded0在Channel特定的EventLoop中被调用,若在提交到线程池过程中抛出异常RejectedExecutionException,需要将当前handler从pipeline中移除。

那么何时执行队列中的任务?

前面已经提过,Channel注册完成后会自动调用,那么我们追溯下Channel注册的源码,查看AbstractChannel中register0方法可知,doRegister之后调用了invokeHandlerAddedIfNeeded方法。

pipeline.invokeHandlerAddedIfNeeded();

从DefaultChannelPipeline中追溯该方法:

final void invokeHandlerAddedIfNeeded() {        assert channel.eventLoop().inEventLoop();        if (firstRegistration) {            firstRegistration = false;            callHandlerAddedForAllHandlers();        }    }

从源码可知,invokeHandlerAddedIfNeeded保证只会被执行一次,firstRegistration默认为true,调用完成后被设置为false,下次调用则不会进入if代码块。下面看看callHandlerAddedForAllHandlers的实现:

private void callHandlerAddedForAllHandlers() {        final PendingHandlerCallback pendingHandlerCallbackHead;        synchronized (this) {            assert !registered;            registered = true;            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;            this.pendingHandlerCallbackHead = null;        }        /**         * 这部分代码必须在synchronized(...)块之外,否则在持有锁时调用handlerAdded(...),如果         * handlerAdded(...)尝试从EventLoop外部添加另一个handler,则会产生死锁。         */        PendingHandlerCallback task = pendingHandlerCallbackHead;        while (task != null) {            task.execute();            task = task.next;        }    }

源码解析:

  • 首先在加锁区将registered设为true,这样做意义是什么呢?由上文可知,只有当Channel注册完成之后,handler生命周期内对应的方法才会被立即调用,否则将封装在task中等待Channel注册完成后调用。registered就是piepline用来标识其关联的Channel是否已经注册的标志,registered被设为true后就不会被改变。因此将registered设为true,那么此时调用addLast,addFirst,remove等方法都将能检测到registered位true,PendingHandlerCallback链表中将不会有新节点会加入了,即添加handler或移除handler都不会排队。
  • 在加锁区将全局变量pendingHandlerCallbackHead的引用赋值给局部变量,再将全局变量置为空,这样做的好处是便于GC回收pendingHandlerCallbackHead,然后使用局部变量中保存的head引用遍历链表;当方法退出后,局部变量消亡,此时不再有引用指向链表,GC可回收整个链表中的节点。

上面代码中随处可见inEventLoop()的身影,这体现了netty线程模型实现机制,即初始化channel时,将channel注册到EventLoop,同时保证整个channel生命周期内的所有操作由EventLoop完成,这样避免了不必要的同步。后续在介绍netty线程模型时会更加详细的分析。

ADD_PENDING->ADD_COMPLETE or INIT->ADD_COMPLETE

当handler被添加到pipeline后,调用setAddComplete将context的handlerState设为ADD_COMPLETE。

如果handlerState已经为REMOVE_COMPLETE,不能再将handlerState设置为ADD_COMPLETE。

final void setAddComplete() {        for (;;) {            int oldState = handlerState;            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {                return;            }        }    }

任何状态的handlerState都可以被设置为REMOVE_COMPLETE。以下情况会将handlerState设为REMOVE_COMPLETE。

  • 正常情况下,调用remove从pipeline中移除handler,handlerRemoved调用完成之后调用setRemoved()将handlerState设为REMOVE_COMPLETE。
  • addLast,addFirst,addAfter,addBefore期间,当handler节点被添加到pipeline后,若后续操作(如调用handlerAdded)抛出异常,也会调用setRemoved()将handlerState设为REMOVE_COMPLETE。

这里简单介绍下AtomicIntegerFieldUpdater。

AtomicIntegerFieldUpdater基于反射的方式对指定类的指定volatile int字段进行原子更新。与AtomicInteger区别在于,AtomicIntegerFieldUpdater是工具类,使申明为int的变量具有原子操作的特性,而AtomicInteger直接代表int的原子类型,其内部持有int类型的变量,两者最终效果一样。

使用AtomicIntegerFieldUpdater有一些限制,具体如下:

  • 字段必须是volatile类型的,保证可见性
  • 字段必须对updater可见
  • 只能是实例变量,不能是类变量

此外,AtomicIntegerFieldUpdater的compareAndSet方法的保证弱于其他原子类,它只能保证同一个updater使用compareAndSet进行操作的原子性,假如存在其他updater使用compareAndSet更新同一变量,则结果是无法预料的。

构造handler链

本小节主要介绍pipeline如何构造由context组成的双向链表。当创建pipeline实例时,默认会构建出链表的头节点(HeadContext)和尾节点(TailContext),此时pipeline还不能实现任何特殊功能,还需加入自定义的handler。pipeline中提供了addLast,addFirst,addAfter,addBefore等将handler加入链表的方法,下面先看看一个特殊的handler,即ChannelInitializer如何加入链表。

添加ChannelInitializer

在初始化Bootstrap时,会调用 handle方法,此时需要创建ChannelInitializer实例,它提供了一个 initChannel方法供我们添加自定义的 ChannelHandler。下面从源码分析ChannelInitializer添加和卸载。

首先看看ChannelInitializer类图:

由类图可知,ChannelInitializer实际也是一个ChannelHandler,那么它是如何加入pipeline中的呢?

源头在Bootstrap的init方法,init的调用顺序为bind->doBind->initAndRegister->init,下面看init源码:

void init(Channel channel) throws Exception {        ChannelPipeline p = channel.pipeline();        p.addLast(config.handler());        ......    }

init中调用了pipeline的addLast方法添加handler,而config.handler()的返回值即为初始化 Bootstrap 调用 handler 设置的 ChannelInitializer 实例,就这样ChannelInitializer被加入到链表末端。现在链表结构如下:

下面分析下addLast()整个过程:

先看简易流程图:

进入加锁区,调用checkMultiplicity检查handler是否重复添加,看实现:

private static void checkMultiplicity(ChannelHandler handler) {        if (handler instanceof ChannelHandlerAdapter) {            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;            if (!h.isSharable() && h.added) {                throw new ChannelPipelineException(                        h.getClass().getName() +                        " is not a @Sharable handler, so can't be added or removed multiple times.");            }            h.added = true;        }    }

由实现可知,handler必须是ChannelHandlerAdapter的实例,否则不会被添加;如果handler不支持共享,且added为true,表明hander重复添加,抛出异常;否则,修改handler的added为true,标识handler已添加。目前只有@Sharable标注的handler支持重复添加,表示handler可共享。

接下来filterName检查handler的名称;如果用户未传入name,netty将自动生成唯一的name,并维护nameCaches;如果用户使用自定义的name,则检查是否存在同名。

private String filterName(String name, ChannelHandler handler) {        if (name == null) {            return generateName(handler);        }        checkDuplicateName(name);        return name;    }

检查通过后,构造context实例,注意,这里的context实例是DefaultChannelHandlerContext,DefaultChannelHandlerContext内部保存了每个节点context对应的handler实例;

context实例创建完成后,调用addLast0将当前context加入双向链表:

private void addLast0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext prev = tail.prev;        newCtx.prev = prev;        newCtx.next = tail;        prev.next = newCtx;        tail.prev = newCtx;    }

然后判断channel是否已注册,如果未注册,将callHandlerAdded0封装在PendingHandlerCallback中待Channel注册后执行;如果Channel已注册,判断当前线程是不是channel注册时绑定的EventLoop对应的线程,否则,将callHandlerAdded0封装为Runnable任务提交给当前channel的EventLoop执行。

最后,退出加锁区,如果上述检查通过,调用callHandlerAdded0,该方法会调用handler对应的handlerAdded方法,同时设置handlerState为ADD_COMPLETE。如果在callHandlerAdded0期间抛出异常,则需要从pipeline中移除当前handler。

上面提到DefaultChannelHandlerContext,简单说明下,其继承自AbstractChannelHandlerContext,只是DefaultChannelHandlerContext中保存了每个context对应的handler实例,同时提供了判断handler的inbound和outbound属性的方法。

private static boolean isInbound(ChannelHandler handler) {        return handler instanceof ChannelInboundHandler;    }    private static boolean isOutbound(ChannelHandler handler) {        return handler instanceof ChannelOutboundHandler;    }

这里ChannelInitializer仅仅实现了ChannelInboundHandler接口, 因此实例化context的inbound=true, outbound=false,pipeline就是通过这两个属性来决定一个handler用于处理入站事件或出站事件的。

添加自定义handler

由添加ChannelInitializer可知,callHandlerAdded0中会调用handler的handlerAdded方法,下面看ChannelInitializer中handlerAdded的实现:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        if (ctx.channel().isRegistered()) {            initChannel(ctx);        }    }

在ChannelInitializer的handlerAdded中调用initChannel有一个好处,如果需要在当前ChannelInitializer中添加另一个ChannelInitializer,不会有任何排序意外,即所有的handler会安装预期的顺序添加。

下面看initChannel方法的实现:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.            try {                initChannel((C) ctx.channel());            } catch (Throwable cause) {                exceptionCaught(ctx, cause);            } finally {                remove(ctx);            }            return true;        }        return false;    }

首先判断initMap中是否存在当前context。当context已经存在时,ConcurrentMap的putIfAbsent返回已存在的值;如果initMap中不存在当前context,加入当前context并返回null,然后调用initChannel方法,该方法则是初始化Bootstrap时构造ChannelInitializer实例所实现的方法,然后将自定义的handler加入到pipeline的链表中。

最后调用remove方法移除当前handler,即ChannelInitializer,同时从initMap中移除。

private void remove(ChannelHandlerContext ctx) {        try {            ChannelPipeline pipeline = ctx.pipeline();            if (pipeline.context(this) != null) {                pipeline.remove(this);            }        } finally {            initMap.remove(ctx);        }    }

到此,自定义handler的添加过程分析完成。

说明:

netty源码有两个地方调用了initChannel(ChannelHandlerContext ctx),第一,添加ChannelInitializer时,在handlerAdded中调用,第二,Channel注册完成之后pipeline调用fireChannelRegistered触发Channel注册事件,此时,若ChannelInitializer还没有被移除,会调用其channelRegistered。看源码:

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {        if (initChannel(ctx)) {            ctx.pipeline().fireChannelRegistered();        } else {            ctx.fireChannelRegistered();        }    }

通常情况下,ChannelInitializer的channelRegistered方法绝不会被调用,因为handlerAdded调用之后,将会添加自定义handler,同时移除ChannelInitializer,fireChannelRegistered触发Channel注册事件自然也不会经过ChannelInitializer。如果ChannelInitializer没有被移除,那么会尝试调用initChannel,若initChannel返回true,可能加入的新的handler,则需要调用pipeline的fireChannelRegistered重新触发Channel注册事件,确保新添加的handler不会丢失事件。如:ChannelInitializer中添加了另外一个ChannelInitializerB,如果ChannelInitializerB非pipeline的第一个节点,在ChannelInitializerB中调用addFirst添加了一个handler作为链表第一个节点,那么这个handler就可能丢失事件,除非调用pipeline的fireChannelRegistered重新触发。

事件传播

下面将结合源码分析ChannelPipeline如何进行事件传播。

inBound事件传播

这里channel激活事件为例,当channel第一次注册时,当channel注册成功后,如果channel是active状态,则AbstractChannel中AbstractUnsafe的register0方法中将触发channel激活事件,调用pipeline的fireChannelActive方法。下面看fireChannelActive的方法实现:

@Override    public final ChannelPipeline fireChannelActive() {        AbstractChannelHandlerContext.invokeChannelActive(head);        return this;    }

由于channel激活事件是入站事件,调用AbstractChannelHandlerContext的invokeChannelActive将传入head节点,AbstractChannelHandlerContext再把事件转发给HeadContext的channelActive方法,下面看看HeadContext中channelActive的实现:

@Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            ctx.fireChannelActive();            readIfIsAutoRead();        }

channelActive中调用了ChannelHandlerContext的fireChannelActive:

@Override    public ChannelHandlerContext fireChannelActive() {        invokeChannelActive(findContextInbound());        return this;    }

由fireChannelActive可知,将调用findContextInbound查找当前节点之后的第一个入站事件handler,并调用invokeChannelActive将事件传递到该handler的channelActive中,这样即实现了事件的传播。直到到达链表的结尾TailContext,TailContext也是入站事件处理handler。由于TailContext的channelActive提供空实现,因此事件到此处理结束。

outBound事件传播

这里以connect事件为例,Bootstrap中为客户端提供connect方法,当调用Bootstrap的connect执行连接操作时,最后将调用AbstractChannel的connect方法:

@Override    public ChannelFuture connect(SocketAddress remoteAddress) {        return pipeline.connect(remoteAddress);    }

然后调用pipeline的connect方法:

@Override    public final ChannelFuture connect(SocketAddress remoteAddress) {        return tail.connect(remoteAddress);    }

connect事件是出站事件,因此pipeline将从handler链表的尾部开始处理,即调用TailContext的connect方法,由于TailContext未重写connect方法,那么调用父类AbstractChannelHandlerContext的connect方法:

@Override    public ChannelFuture connect(SocketAddress remoteAddress) {        return connect(remoteAddress, newPromise());    }

context中connect方法将调用findContextOutbound方法通过prev指针向前查找第一个处理outBound事件的handler,然后调用invokeConnect传播事件,最终将会达到HeadContext,即head节点。由于HeadContext实现了ChannelOutboundHandler,因此,它也可以处理出站事件。那么看看HeadContext的connect实现:

@Override        public void connect(                ChannelHandlerContext ctx,                SocketAddress remoteAddress, SocketAddress localAddress,                ChannelPromise promise) throws Exception {            unsafe.connect(remoteAddress, localAddress, promise);        }

由源码可知,最终调用unsafe的connect方法发起真正的连接。

总结,本文主要介绍了ChannelHandlerContext,并结合源码分析了其如何通过handlerState来标识ChannelHandler的状态;同时结合源码介绍了pipeline中handler链的构造,ChannelInitializer和自定义handler的添加;最后通过源码分析了netty中I/O事件如何通过pipeline的调度完成传播。

欢迎指出本文有误的地方,转载请注明原文出处

转载于:https://my.oschina.net/7001/blog/1037187

你可能感兴趣的文章
HDU-2044-一只小蜜蜂
查看>>
HDU-1394-Minimum Inversion Number
查看>>
df -h 卡住
查看>>
[转] createObjectURL方法 实现本地图片预览
查看>>
JavaScript—DOM编程核心.
查看>>
JavaScript碎片
查看>>
Bootstrap-下拉菜单
查看>>
soapUi 接口测试
查看>>
【c学习-12】
查看>>
工作中MySql的了解到的小技巧
查看>>
loadrunner-2-12日志解析
查看>>
C# Memcached缓存
查看>>
iOS开发NSLayoutConstraint代码自动布局
查看>>
正则表达式
查看>>
mysql [ERROR] Can't create IP socket: Permission denied
查看>>
PBRT笔记(4)——颜色和辐射度
查看>>
CustomView的手势缩放总结
查看>>
linux复制指定目录下的全部文件到另一个目录中,linux cp 文件夹
查看>>
CentOS yum安装mysql
查看>>
OceanBase笔记1:代码规范
查看>>