ChannelHandlerContext简介
ChannelHandlerContext的主要功能是使ChannelHandler能够与所属ChannelPipeline中的其他ChannelHandler进行交互,同时,使ChannelHandler可以在上游或下游传递事件,执行I/O操作,动态修改流水线或使用AttributeKeys存储ChannelHandler特有的信息等等。
先简单看看类图:
AbstractChannelHandlerContext为ChannelHandlerContext提供了很多默认实现;实现ChannelInboundInvoker和ChannelOutboundInvoker使得context可以传播入站事件和出站事件,实现ResourceLeakHint使得ChannelHandlerContext具备内存泄漏检测的能力。下面开始结合源码分析ChannelPipeline和ChannelHandlerContext的实现原理。
管理handler状态
AbstractChannelHandlerContext中使用handlerState来标识ChannelHandler的状态。
private volatile int handlerState = INIT; /** * ChannelHandler的handlerAdded(ChannelHandlerContext)方法即将被调用. */ private static final int ADD_PENDING = 1; /** * ChannelHandler的handlerAdded(ChannelHandlerContext)方法调用完成 */ private static final int ADD_COMPLETE = 2; /** * ChannelHandler的handlerRemoved(ChannelHandlerContext)方法调用完成 */ private static final int REMOVE_COMPLETE = 3; /** * 初始状态 */ private static final int INIT = 0;
INIT:初始状态;
ADD_PENDING :用于标识handler添加的中间状态,表示handler已添加到pipeline链表上,但handlerAdded仍未调用。之所以有这个状态,是因为当ordered为false时,此状态的handler也可以处理事;ordered为true,意味着EventExecutor需要根据pipeline中已经添加的handler顺序处理事件;
ADD_COMPLETE :ChannelHandler的handlerAdded已经被调用,此状态的handler可以处理事件;
REMOVE_COMPLETE:ChannelHandler的handlerRemoved已经被调用,此状态的handler不能再处理事件。
状态转化:
INIT->ADD_PENDING
方法setAddPending用于将handlerState从INIT转化为ADD_PENDING ,且setAddPending只会在addLast,addFirst,addAfter,addBefore等添加handler的方法中才可能被调用。进入源码:
final void setAddPending() { boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); assert updated; }
以下两种情况下会调用setAddPending将handlerState设置为ADD_PENDING:
- Channel未注册,将callHandlerAdded0封装在PendingHandlerCallback中待Channel注册后执行;
- 当前线程不是channel注册时绑定的EventLoop,将callHandlerAdded0封装为Runnable任务提交给当前channel的EventLoop执行。
下面以DefaultChannelPipeline中addLast源码来分析:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
addLast的作用是将自定义handler添加到pipeline双向链表尾部,但是它不保证handler的handlerAdded方法会被立即调用。由源码可知,当registered为false时,调用setAddPending将handlerState设置为为ADD_PENDING,同时调用callHandlerCallbackLater封装task,用于Channel注册完成后调用callHandlerAdded0,在callHandlerAdded0中调用handlerAdded,并将handlerState设置为ADD_COMPLETE;如果Channel已经注册,即registered为true,但当前executor 不是channel注册时绑定的EventLoop,调用setAddPending将handlerState设置为为ADD_PENDING,同时将callHandlerAdded0封装在Runnable任务中提交给当前channel的EventLoop执行。
当Channel未注册时,使用PendingHandlerCallback来封装Channel注册完成后需要执行的任务,下面看实现:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) { pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }
由callHandlerCallbackLater方法可知,ChannelPipeline维护了一个链表,使用PendingHandlerCallback作为节点,PendingHandlerCallback同时也是一个Runnable,可直接提交到EventLoop执行。pendingHandlerCallbackHead用于保存head节点的引用,方便后续遍历链表。
PendingHandlerCallback中维护了context和next节点的引用,并提供了抽象方法execute()供子类实现,主要PendingHandlerAddedTask和PendingHandlerRemovedTask两个实现,分别用于添加handler和移除handler。
下面以PendingHandlerAddedTask源码为例:
private abstract static class PendingHandlerCallback implements Runnable { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; } abstract void execute(); } private final class PendingHandlerAddedTask extends PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { callHandlerAdded0(ctx); } @Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } remove0(ctx); ctx.setRemoved(); } } } }
源码很简单,主要说明下execute()的实现,同样需要保证callHandlerAdded0在Channel特定的EventLoop中被调用,若在提交到线程池过程中抛出异常RejectedExecutionException,需要将当前handler从pipeline中移除。
那么何时执行队列中的任务?
前面已经提过,Channel注册完成后会自动调用,那么我们追溯下Channel注册的源码,查看AbstractChannel中register0方法可知,doRegister之后调用了invokeHandlerAddedIfNeeded方法。
pipeline.invokeHandlerAddedIfNeeded();
从DefaultChannelPipeline中追溯该方法:
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
从源码可知,invokeHandlerAddedIfNeeded保证只会被执行一次,firstRegistration默认为true,调用完成后被设置为false,下次调用则不会进入if代码块。下面看看callHandlerAddedForAllHandlers的实现:
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } /** * 这部分代码必须在synchronized(...)块之外,否则在持有锁时调用handlerAdded(...),如果 * handlerAdded(...)尝试从EventLoop外部添加另一个handler,则会产生死锁。 */ PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; } }
源码解析:
- 首先在加锁区将registered设为true,这样做意义是什么呢?由上文可知,只有当Channel注册完成之后,handler生命周期内对应的方法才会被立即调用,否则将封装在task中等待Channel注册完成后调用。registered就是piepline用来标识其关联的Channel是否已经注册的标志,registered被设为true后就不会被改变。因此将registered设为true,那么此时调用addLast,addFirst,remove等方法都将能检测到registered位true,PendingHandlerCallback链表中将不会有新节点会加入了,即添加handler或移除handler都不会排队。
- 在加锁区将全局变量pendingHandlerCallbackHead的引用赋值给局部变量,再将全局变量置为空,这样做的好处是便于GC回收pendingHandlerCallbackHead,然后使用局部变量中保存的head引用遍历链表;当方法退出后,局部变量消亡,此时不再有引用指向链表,GC可回收整个链表中的节点。
上面代码中随处可见inEventLoop()的身影,这体现了netty线程模型实现机制,即初始化channel时,将channel注册到EventLoop,同时保证整个channel生命周期内的所有操作由EventLoop完成,这样避免了不必要的同步。后续在介绍netty线程模型时会更加详细的分析。
ADD_PENDING->ADD_COMPLETE or INIT->ADD_COMPLETE
当handler被添加到pipeline后,调用setAddComplete将context的handlerState设为ADD_COMPLETE。
如果handlerState已经为REMOVE_COMPLETE,不能再将handlerState设置为ADD_COMPLETE。
final void setAddComplete() { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
任何状态的handlerState都可以被设置为REMOVE_COMPLETE。以下情况会将handlerState设为REMOVE_COMPLETE。
- 正常情况下,调用remove从pipeline中移除handler,handlerRemoved调用完成之后调用setRemoved()将handlerState设为REMOVE_COMPLETE。
- addLast,addFirst,addAfter,addBefore期间,当handler节点被添加到pipeline后,若后续操作(如调用handlerAdded)抛出异常,也会调用setRemoved()将handlerState设为REMOVE_COMPLETE。
这里简单介绍下AtomicIntegerFieldUpdater。
AtomicIntegerFieldUpdater基于反射的方式对指定类的指定volatile int字段进行原子更新。与AtomicInteger区别在于,AtomicIntegerFieldUpdater是工具类,使申明为int的变量具有原子操作的特性,而AtomicInteger直接代表int的原子类型,其内部持有int类型的变量,两者最终效果一样。
使用AtomicIntegerFieldUpdater有一些限制,具体如下:
- 字段必须是volatile类型的,保证可见性
- 字段必须对updater可见
- 只能是实例变量,不能是类变量
此外,AtomicIntegerFieldUpdater的compareAndSet方法的保证弱于其他原子类,它只能保证同一个updater使用compareAndSet进行操作的原子性,假如存在其他updater使用compareAndSet更新同一变量,则结果是无法预料的。
构造handler链
本小节主要介绍pipeline如何构造由context组成的双向链表。当创建pipeline实例时,默认会构建出链表的头节点(HeadContext)和尾节点(TailContext),此时pipeline还不能实现任何特殊功能,还需加入自定义的handler。pipeline中提供了addLast,addFirst,addAfter,addBefore等将handler加入链表的方法,下面先看看一个特殊的handler,即ChannelInitializer如何加入链表。
添加ChannelInitializer
在初始化Bootstrap时,会调用 handle方法,此时需要创建ChannelInitializer实例,它提供了一个 initChannel方法供我们添加自定义的 ChannelHandler。下面从源码分析ChannelInitializer添加和卸载。
首先看看ChannelInitializer类图:
由类图可知,ChannelInitializer实际也是一个ChannelHandler,那么它是如何加入pipeline中的呢?
源头在Bootstrap的init方法,init的调用顺序为bind->doBind->initAndRegister->init,下面看init源码:
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); ...... }
init中调用了pipeline的addLast方法添加handler,而config.handler()的返回值即为初始化 Bootstrap 调用 handler 设置的 ChannelInitializer 实例,就这样ChannelInitializer被加入到链表末端。现在链表结构如下:
下面分析下addLast()整个过程:
先看简易流程图:
进入加锁区,调用checkMultiplicity检查handler是否重复添加,看实现:
private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } }
由实现可知,handler必须是ChannelHandlerAdapter的实例,否则不会被添加;如果handler不支持共享,且added为true,表明hander重复添加,抛出异常;否则,修改handler的added为true,标识handler已添加。目前只有@Sharable标注的handler支持重复添加,表示handler可共享。
接下来filterName检查handler的名称;如果用户未传入name,netty将自动生成唯一的name,并维护nameCaches;如果用户使用自定义的name,则检查是否存在同名。
private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; }
检查通过后,构造context实例,注意,这里的context实例是DefaultChannelHandlerContext,DefaultChannelHandlerContext内部保存了每个节点context对应的handler实例;
context实例创建完成后,调用addLast0将当前context加入双向链表:
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
然后判断channel是否已注册,如果未注册,将callHandlerAdded0封装在PendingHandlerCallback中待Channel注册后执行;如果Channel已注册,判断当前线程是不是channel注册时绑定的EventLoop对应的线程,否则,将callHandlerAdded0封装为Runnable任务提交给当前channel的EventLoop执行。
最后,退出加锁区,如果上述检查通过,调用callHandlerAdded0,该方法会调用handler对应的handlerAdded方法,同时设置handlerState为ADD_COMPLETE。如果在callHandlerAdded0期间抛出异常,则需要从pipeline中移除当前handler。
上面提到DefaultChannelHandlerContext,简单说明下,其继承自AbstractChannelHandlerContext,只是DefaultChannelHandlerContext中保存了每个context对应的handler实例,同时提供了判断handler的inbound和outbound属性的方法。
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
这里ChannelInitializer仅仅实现了ChannelInboundHandler接口, 因此实例化context的inbound=true, outbound=false,pipeline就是通过这两个属性来决定一个handler用于处理入站事件或出站事件的。
添加自定义handler
由添加ChannelInitializer可知,callHandlerAdded0中会调用handler的handlerAdded方法,下面看ChannelInitializer中handlerAdded的实现:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } }
在ChannelInitializer的handlerAdded中调用initChannel有一个好处,如果需要在当前ChannelInitializer中添加另一个ChannelInitializer,不会有任何排序意外,即所有的handler会安装预期的顺序添加。
下面看initChannel方法的实现:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }
首先判断initMap中是否存在当前context。当context已经存在时,ConcurrentMap的putIfAbsent返回已存在的值;如果initMap中不存在当前context,加入当前context并返回null,然后调用initChannel方法,该方法则是初始化Bootstrap时构造ChannelInitializer实例所实现的方法,然后将自定义的handler加入到pipeline的链表中。
最后调用remove方法移除当前handler,即ChannelInitializer,同时从initMap中移除。
private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } finally { initMap.remove(ctx); } }
到此,自定义handler的添加过程分析完成。
说明:
netty源码有两个地方调用了initChannel(ChannelHandlerContext ctx),第一,添加ChannelInitializer时,在handlerAdded中调用,第二,Channel注册完成之后pipeline调用fireChannelRegistered触发Channel注册事件,此时,若ChannelInitializer还没有被移除,会调用其channelRegistered。看源码:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }
通常情况下,ChannelInitializer的channelRegistered方法绝不会被调用,因为handlerAdded调用之后,将会添加自定义handler,同时移除ChannelInitializer,fireChannelRegistered触发Channel注册事件自然也不会经过ChannelInitializer。如果ChannelInitializer没有被移除,那么会尝试调用initChannel,若initChannel返回true,可能加入的新的handler,则需要调用pipeline的fireChannelRegistered重新触发Channel注册事件,确保新添加的handler不会丢失事件。如:ChannelInitializer中添加了另外一个ChannelInitializerB,如果ChannelInitializerB非pipeline的第一个节点,在ChannelInitializerB中调用addFirst添加了一个handler作为链表第一个节点,那么这个handler就可能丢失事件,除非调用pipeline的fireChannelRegistered重新触发。
事件传播
下面将结合源码分析ChannelPipeline如何进行事件传播。
inBound事件传播
这里channel激活事件为例,当channel第一次注册时,当channel注册成功后,如果channel是active状态,则AbstractChannel中AbstractUnsafe的register0方法中将触发channel激活事件,调用pipeline的fireChannelActive方法。下面看fireChannelActive的方法实现:
@Override public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
由于channel激活事件是入站事件,调用AbstractChannelHandlerContext的invokeChannelActive将传入head节点,AbstractChannelHandlerContext再把事件转发给HeadContext的channelActive方法,下面看看HeadContext中channelActive的实现:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }
channelActive中调用了ChannelHandlerContext的fireChannelActive:
@Override public ChannelHandlerContext fireChannelActive() { invokeChannelActive(findContextInbound()); return this; }
由fireChannelActive可知,将调用findContextInbound查找当前节点之后的第一个入站事件handler,并调用invokeChannelActive将事件传递到该handler的channelActive中,这样即实现了事件的传播。直到到达链表的结尾TailContext,TailContext也是入站事件处理handler。由于TailContext的channelActive提供空实现,因此事件到此处理结束。
outBound事件传播
这里以connect事件为例,Bootstrap中为客户端提供connect方法,当调用Bootstrap的connect执行连接操作时,最后将调用AbstractChannel的connect方法:
@Override public ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress); }
然后调用pipeline的connect方法:
@Override public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); }
connect事件是出站事件,因此pipeline将从handler链表的尾部开始处理,即调用TailContext的connect方法,由于TailContext未重写connect方法,那么调用父类AbstractChannelHandlerContext的connect方法:
@Override public ChannelFuture connect(SocketAddress remoteAddress) { return connect(remoteAddress, newPromise()); }
context中connect方法将调用findContextOutbound方法通过prev指针向前查找第一个处理outBound事件的handler,然后调用invokeConnect传播事件,最终将会达到HeadContext,即head节点。由于HeadContext实现了ChannelOutboundHandler,因此,它也可以处理出站事件。那么看看HeadContext的connect实现:
@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
由源码可知,最终调用unsafe的connect方法发起真正的连接。
总结,本文主要介绍了ChannelHandlerContext,并结合源码分析了其如何通过handlerState来标识ChannelHandler的状态;同时结合源码介绍了pipeline中handler链的构造,ChannelInitializer和自定义handler的添加;最后通过源码分析了netty中I/O事件如何通过pipeline的调度完成传播。
欢迎指出本文有误的地方,转载请注明原文出处