Donald_Draper 阅读(29) 评论(0)
Netty 通道处理器ChannelHandler和适配器定义ChannelHandlerAdapter:http://donald-draper.iteye.com/blog/2386891
Netty Inbound/Outbound通道处理器定义:http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler):http://donald-draper.iteye.com/blog/2387772
netty 消息编码器-MessageToByteEncoder:http://donald-draper.iteye.com/blog/2387832
netty 消息解码器-ByteToMessageDecoder:http://donald-draper.iteye.com/blog/2388088
引言:
前面几篇文章我们看了Netty通道处理器,在分析的过程中我们经常会遇到一个概念为Channel管道线,只知道与通道处理器上下文关联的通道处理器存放在Channel管道线,但一直不知道管道线具体是如何工作的,从今天起,我们将揭开Channel管道线的面纱。
我先从Netty服务端实例代码开始看起,服务端代码有这么一段:
 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
    ServerBootstrap serverBoot = new ServerBootstrap(); 
    serverBoot.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class) 
     .childHandler(new ChannelInitializer<SocketChannel>() { 
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
        	//添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似,
        	//ChannelInitializer用于配置通道的管道线,ChannelPipeline
        	 ChannelPipeline pipeline = ch.pipeline();
             if (sslCtx != null) {
            	 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
             }
             pipeline.addLast(new LoggingHandler(LogLevel.INFO));
             pipeline.addLast(new EchoServerHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、
     .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道
    InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
    // 绑定地址,开始监听
    ChannelFuture f = serverBoot.bind(inetSocketAddress).sync();
    log.info("=========Server is start=========");
    //等待,直到ServerSocket关闭
    f.channel().closeFuture().sync();

从上面可以看出,通道管道线ChannelPipeline是从Socket通道获取,而Socket通道
为ChannelInitializer的参数类型。到ChannelInitializer定义中,并没有发现ChannelPipeline到的相关信息,到ChannelPipeline定义中的java doc 接口定义中有这么一段
Each channel has its own pipeline and it is created automatically when a new channel is created.每个通道拥有自己的管道,当通道创建时,管道自动创建。有这么一句话,我们找channel相关的信息,在上述代码中有个ServerSocket通道NioServerSocketChannel,查看定义:
public class NioServerSocketChannel extends AbstractNioMessageChannel
    implements io.netty.channel.socket.ServerSocketChannel

没有管道相关信息,再往上找
public abstract class AbstractNioMessageChannel extends AbstractNioChannel

仍然没有,再往上找
/**
 * A skeletal {@link Channel} implementation.
 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;//通道管道线
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;
    private volatile boolean registered;

    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    protected DefaultChannelPipeline newChannelPipeline()
    {
        return new DefaultChannelPipeline(this);
    }
}

到这里终于找到我们想要的了DefaultChannelPipeline
/**
 * The default {@link ChannelPipeline} implementation.  It is usually created
 * by a {@link Channel} implementation when the {@link Channel} is created.
 */
public class DefaultChannelPipeline implements ChannelPipeline {

再看管道接口的定义
public interface ChannelPipeline
   extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

再看Channel管道线接口定义之前,我们先看一下ChannelInboundInvoker和ChannelOutboundInvoker接口的定义。

public interface ChannelInboundInvoker {

    /**
     * A {@link Channel} was registered to its {@link EventLoop}.
     *通道注册到事件循环
     * This will result in having the  {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Inbound通道处理器的channelRegistere方法
     */
    ChannelInboundInvoker fireChannelRegistered();

    /**
     * A {@link Channel} was unregistered from its {@link EventLoop}.
     *通道从事件循环移除
     * This will result in having the  {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Inbound通道处理器的channelUnregistered方法
     */
    ChannelInboundInvoker fireChannelUnregistered();

    /**
     * A {@link Channel} is active now, which means it is connected.
     *通道激活,意味着已经连接诶
     * This will result in having the  {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Inbound通道处理器的channelActive方法
     */
    ChannelInboundInvoker fireChannelActive();

    /**
     * A {@link Channel} is inactive now, which means it is closed.
     *通道以失效,意味通道已经关闭
     * This will result in having the  {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
    此方法将会触发管道线上的下一个Inbound通道处理器的channelInactive方法
     */
    ChannelInboundInvoker fireChannelInactive();

    /**
     * A {@link Channel} received an {@link Throwable} in one of its inbound operations.
     *通道在inbound的相关操作中,收到一个异常
     * This will result in having the  {@link ChannelInboundHandler#exceptionCaught(ChannelHandlerContext, Throwable)}
     * method  called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Inbound通道处理器的exceptionCaught方法
     */
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);

    /**
     * A {@link Channel} received an user defined event.
     *通道接收一个用户自定义事件
     * This will result in having the  {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
     * method  called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
    此方法将会触发管道线上的下一个Inbound通道处理器的userEventTriggeret方法
     */
    ChannelInboundInvoker fireUserEventTriggered(Object event);

    /**
     * A {@link Channel} received a message.
     *通道接收一个消息
     * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
     * method  called of the next {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Inbound通道处理器的channelRead方法
     */
    ChannelInboundInvoker fireChannelRead(Object msg);

    /**
     * Triggers an {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}
     * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     触发管道线上的下一个Inbound通道处理器的channelReadComplete事件
     */
    ChannelInboundInvoker fireChannelReadComplete();

    /**
     * Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}
     * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     触发管道线上的下一个Inbound通道处理器的channelReadComplete事件
     */
    ChannelInboundInvoker fireChannelWritabilityChanged();
}


从Inbound通道Invoker的定义来看,ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。



再来看ChannelOutboundInvoker接口定义:

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FutureListener;

import java.net.ConnectException;
import java.net.SocketAddress;

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     请求绑定给定socket地址,当操作完成,无论成功或者失败,通知ChannelFuture
     * 
     * This will result in having the
     * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method
     * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
      此方法将会触发管道线上的下一个Outbound通道处理器的bind方法
     */
    ChannelFuture bind(SocketAddress localAddress);

    /**
     * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     请求连接给定的socket地址,当操作完成,无论成功或者失败,通知ChannelFuture
     * <p>
     * If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
     * a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
     * will be used.
     如果连接因为超时失败,ChannelFuture将会获取一个超时异常。如果是拒绝连接,则为连接异常。
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的connect方法
     */
    ChannelFuture connect(SocketAddress remoteAddress);

    /**
     * Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
     * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
     * an error.
     绑定本地socket地址,连接远端socket地址,当操作完成,无论成功或者失败,通知ChannelFuture。
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的connect方法
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,
     * either because the operation was successful or because of an error.
     断开远端的peer连接,当操作完成,无论成功或者失败,通知ChannelFuture。
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的disconnect方法
     */
    ChannelFuture disconnect();
     
    /**
     * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes,
     * either because the operation was successful or because of
     * an error.
     *请求关闭通道,当操作完成,无论成功或者失败,通知ChannelFuture。
     * After it is closed it is not possible to reuse it again.
     * <p>
     在关闭之后,不能重新使用。
     * This will result in having the
     * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的close方法
     */
    ChannelFuture close();

    /**
     * Request to deregister from the previous assigned {@link EventExecutor} and notify the
     * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
     * an error.
     请求从先前分配的EventExecutor中,注销,当操作完成,无论成功或者失败,通知ChannelFuture。
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的deregister方法
     *
     */
    ChannelFuture deregister();

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     * The given {@link ChannelPromise} will be notified.
     给定的ChannelPromise将会被唤醒
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method
     * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     此方法将会触发管道线上的下一个Outbound通道处理器的bind方法
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     * The given {@link ChannelFuture} will be notified.
     *给定的ChannelPromise将会被唤醒
     * <p>
     * If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
     * a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
     * will be used.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
     * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
     * an error.
     *
     * The given {@link ChannelPromise} will be notified and also returned.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,
     * either because the operation was successful or because of an error.
     *
     * The given {@link ChannelPromise} will be notified.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture disconnect(ChannelPromise promise);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes,
     * either because the operation was successful or because of
     * an error.
     *
     * After it is closed it is not possible to reuse it again.
     * The given {@link ChannelPromise} will be notified.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture close(ChannelPromise promise);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to deregister from the previous assigned {@link EventExecutor} and notify the
     * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
     * an error.
     *
     * The given {@link ChannelPromise} will be notified.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture deregister(ChannelPromise promise);

    /**
     * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
     * {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was
     * read, and triggers a
     * {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
     * handler can decide to continue reading.  If there's a pending read operation already, this method does nothing.
     * <p>
     请求从第一个inbound buf读取数据,如果有数据读取,则触发Inbound通道处理器的channelRead方法事件和#channelReadComplete
     事件,以便处理器可以决定是否可以继续读取数据。如果一个读操作正在发生,则此方法不做任何事情。
     * This will result in having the
     * {@link ChannelOutboundHandler#read(ChannelHandlerContext)}
     * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
     * {@link Channel}.
      此方法将会触发管道线上的下一个Outbound通道处理器的read方法
     */
    ChannelOutboundInvoker read();

    /**
     * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
     * This method will not request to actual flush, so be sure to call {@link #flush()}
     * once you want to request to flush all pending data to the actual transport.
     请求通过通道处理器上下文发送消息到Channel管道线。此方不会请求实际的刷新,如果你想请求刷新待发送的数据
     到transport,必须调用flush方法。
     */
    ChannelFuture write(Object msg);

    /**
    与上述对应的方法,基本相同,只不过,完成后,给定的ChannelPromise将会被唤醒
     * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
     * This method will not request to actual flush, so be sure to call {@link #flush()}
     * once you want to request to flush all pending data to the actual transport.
     */
    ChannelFuture write(Object msg, ChannelPromise promise);

    /**
     * Request to flush all pending messages via this ChannelOutboundInvoker.
     通过Outbound通道Invoker,刷新待发送的消息到transport
     */
    ChannelOutboundInvoker flush();

    /**
     * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
     此方法为#write(Object, ChannelPromise)和#flush的快捷方式
     */
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    /**
     * Shortcut for call {@link #write(Object)} and {@link #flush()}.
     此方法为#write(Object)和#flush的快捷方式
     */
    ChannelFuture writeAndFlush(Object msg);

    /**
     * Return a new {@link ChannelPromise}.
     创建一个ChannelPromise
     */
    ChannelPromise newPromise();

    /**
     * Return an new {@link ChannelProgressivePromise}
     创建一个ChannelProgressivePromise
     */
    ChannelProgressivePromise newProgressivePromise();

    /**
     * Create a new {@link ChannelFuture} which is marked as succeeded already. So {@link ChannelFuture#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记成功的通道结果。ChannelFuture#isSuccess方法返回ture。所有添加到通道结果的监听器,将会被直接唤醒。
     每个阻塞的方法,将会无阻塞返回。
     */
    ChannelFuture newSucceededFuture();

    /**
     * Create a new {@link ChannelFuture} which is marked as failed already. So {@link ChannelFuture#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记失败的通道结果。ChannelFuture#isSuccess方法返回false。所有添加到通道结果的监听器,将会被直接唤醒。
     每个阻塞的方法,将会无阻塞返回。
     */
    ChannelFuture newFailedFuture(Throwable cause);

    /**
     * Return a special ChannelPromise which can be reused for different operations.
     返回一个特别的ChannelPromise,可以重用于不同的操作。
     * <p>
     * It's only supported to use
     * it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}.
     仅仅支持ChannelOutboundInvoker#write(Object, ChannelPromise)方法
     * 

     * 
     * Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used
     * if you want to save an object allocation for every write operation. You will not be able to detect if the
     * operation  was complete, only if it failed as the implementation will call
     * {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case.
     注意,返回的ChannelPromise,不支持大部分的操作,如果在为每个写操作保存一个分配的对象时,应该使用此方法。
     如果操作已经完成,当且仅当,由于调用ChannelPipeline#fireExceptionCaught引起的失败,你不能够探测时,可以使用
     此方法创建的ChannelPromise
     * 

     * [b]Be aware this is an expert feature and should be used with care![/b]
     请谨慎使用。
     */
    ChannelPromise voidPromise();
}

Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道结果创建方法,ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。在Outbound通道Invoker的方法定义中,我们看到有很多类型的返回结果,比如:ChannelFuture,ChannelPromise,ChannelProgressivePromise这个我们放在下一篇来看。


总结:
每个通道Channel拥有自己的管道Pipeline,当通道创建时,管道自动创建,默认为DefaultChannelPipeline。Inbound通道Invoker ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道结果创建方法,
ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。