Donald_Draper 阅读(33) 评论(0)
Mina IoService接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378271
Mina Io监听器接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378315
Mina 抽象polling监听器:http://donald-draper.iteye.com/blog/2378649
引言:
   上一篇文章看了抽象polling监听器,先来回顾一下:
AbstractPollingIoAcceptor主要变量为Io处理器processor,地址绑定请求队列registerQueue,地址解绑请求队列cancelQueue,监听器绑定的socket地址,与ServerSocketChannel映射关系boundHandles-Map,监听工作线程Acceptor引用acceptorRef。构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,创建监听器Acceptor实例并执行。Acceptor主要功能为,地址绑定,监听连接请求,解绑地址,实际工作逻辑为:如果监听器AbstractPollingIoAcceptor已经初始化,首先根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。Acceptor和AbstractPollingIoAcceptor的关系,与AbstractPollingIoProcessor和Processor的关系很像。地址解绑过程,首先根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。
AbstractPollingIoAcceptor所有的工作(地址绑定,接收连接,创建会话,添加会话到IO处理器,解绑地址,释放监听器资源)都是在Acceptor线程里完成。
今天来看一下抽象polling监听器的实现NioSocketAcceptor
/**
 * {@link IoAcceptor} for socket transport (TCP/IP).  This class
 * handles incoming TCP/IP based socket connections.
 *socket监听器,接收socket连接诶。
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
implements SocketAcceptor {
    private volatile Selector selector;//选择器
    private volatile SelectorProvider selectorProvider = null;//选择器提供者
}

从上来看socket监听,有两个内部变量为选择器selector和选择器提供selectorProvider。
再来看构造:
 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters 
  (multiple thread model).
  默认参数,多线程模型
  */
 public NioSocketAcceptor() {
     super(new DefaultSocketSessionConfig(), NioProcessor.class);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters, and
  * given number of {@link NioProcessor} for multithreading I/O operations.
  * 使用默认参数构造,创建processorCount个NioProcessor处理器线程,多线程处理IO操作
  * @param processorCount the number of processor to create and place in a
  * {@link SimpleIoProcessorPool}
  */
 public NioSocketAcceptor(int processorCount) {
     super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  *  Constructor for {@link NioSocketAcceptor} with default configuration but a
  *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
  *  {@link IoService} of the same type.
  多服务共享同一个处理器processor,
  * @param processor the processor to use for managing I/O events
  */
 public NioSocketAcceptor(IoProcessor<NioSession> processor) {
     super(new DefaultSocketSessionConfig(), processor);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
  *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
  *  sharing the same processor and executor over multiple {@link IoService} of the same type.
  多服务共享同一个处理器processor,用给定执行器处理连接事件和,给定的处理器处理IO事件。
  * @param executor the executor for connection
  * @param processor the processor for I/O operations
  */
 public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
     super(new DefaultSocketSessionConfig(), executor, processor);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters, and
  * given number of {@link NioProcessor} for multithreading I/O operations, and
  * a custom SelectorProvider for NIO
  *多处理器处理多线程的IO操作
  * @param processorCount the number of processor to create and place in a
  * @param selectorProvider teh SelectorProvider to use
  * {@link SimpleIoProcessorPool}
  */
 public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
     super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
     this.selectorProvider = selectorProvider;
 }

上面几个构造方法与AbstractPollingIoAcceptor基本相同。
再来看其他操作:
   /**
     * {@inheritDoc}
     */
    @Override
    protected void init() throws Exception {
        selector = Selector.open();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    protected void init(SelectorProvider selectorProvider) throws Exception {
        this.selectorProvider = selectorProvider;

        if (selectorProvider == null) {
            selector = Selector.open();
        } else {
            selector = selectorProvider.openSelector();
        }
    }

从上面两个方法来看,init方法主要工作为打开一个选择器selector。
  
 /**
     * {@inheritDoc}
     */
    @Override
    protected void destroy() throws Exception {
        if (selector != null) {
            selector.close();
        }
    }
    /**
     * {@inheritDoc}
     */
    public TransportMetadata getTransportMetadata() {
        return NioSocketSession.METADATA;
    }
   /**
     * {@inheritDoc}
     */
    @Override
    public InetSocketAddress getLocalAddress() {
        return (InetSocketAddress) super.getLocalAddress();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public InetSocketAddress getDefaultLocalAddress() {
        return (InetSocketAddress) super.getDefaultLocalAddress();
    }

    /**
     * {@inheritDoc}
     */
    public void setDefaultLocalAddress(InetSocketAddress localAddress) {
        setDefaultLocalAddress((SocketAddress) localAddress);
    }

来看打开一个socket地址:
   /**
     * {@inheritDoc}
     */
    @Override
    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        ServerSocketChannel channel = null;
	//如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则
	//通过ServerSocketChannel打开一个socket通道服务者。
        if (selectorProvider != null) {
            channel = selectorProvider.openServerSocketChannel();
        } else {
            channel = ServerSocketChannel.open();
        }
        boolean success = false;
        try {
	    //配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址
            // This is a non blocking socket channel
            channel.configureBlocking(false);
            // Configure the server socket,
            ServerSocket socket = channel.socket();
            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());
            // and bind.
            try {
                socket.bind(localAddress, getBacklog());
            } catch (IOException ioe) {
                // Add some info regarding the address we try to bind to the
                // message
                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                        + ioe.getMessage();
                Exception e = new IOException(newMessage);
                e.initCause(ioe.getCause());

                // And close the channel
                channel.close();

                throw e;
            }
            // Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(channel);
            }
        }
        return channel;
    }

从上来看打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。
来看接收连接:
   /**
     * {@inheritDoc}
     */
    @Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
        SelectionKey key = null;
	//处理serversocketHandler待处理的选择key
        if (handle != null) {
            key = handle.keyFor(selector);
        }
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }
        // accept the connection from the client
	//接收连接
        SocketChannel ch = handle.accept();
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch);
    }

从上面来看监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。
再来看选择操作:
 
  /**
     * Check if we have at least one key whose corresponding channels is
     * ready for I/O operations.
     *检查通道是否有选择key已就绪读写操作
     * This method performs a blocking selection operation.
     * It returns only after at least one channel is selected,
     * this selector's wakeup method is invoked, or the current thread
     * is interrupted, whichever comes first.
     * 
     * @return The number of keys having their ready-operation set updated
     * @throws IOException If an I/O error occurs
     * @throws ClosedSelectorException If this selector is closed
     */
    @Override
    protected int select() throws Exception {
        return selector.select();
    }

从上来看选择操作实际委托给内部选择器。
再来看其他方法
/**
 * {@inheritDoc}
 获取serversocket通道本地地址
 */
@Override
protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
    return handle.socket().getLocalSocketAddress();
}
*
 * {@inheritDoc}
 关闭serversocket通道
 */
@Override
protected void close(ServerSocketChannel handle) throws Exception {
    SelectionKey key = handle.keyFor(selector);
    if (key != null) {
        key.cancel();
    }
    handle.close();
}

/**
 * {@inheritDoc}
 唤醒选择器
 */
@Override
protected void wakeup() {
    selector.wakeup();
}
 /**
  * {@inheritDoc}
  获取注册到选择器的ServerSocketChannel
  */
 @Override
 protected Iterator<ServerSocketChannel> selectedHandles() {
     return new ServerSocketChannelIterator(selector.selectedKeys());
 }

 /**
  * Defines an iterator for the selected-key Set returned by the
  * selector.selectedKeys(). It replaces the SelectionKey operator.
  */
 private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
     /** The selected-key iterator */
     private final Iterator<SelectionKey> iterator;

     /**
      * Build a SocketChannel iterator which will return a SocketChannel instead of
      * a SelectionKey.
      * 
      * @param selectedKeys The selector selected-key set
      */
     private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
         iterator = selectedKeys.iterator();
     }

     /**
      * Tells if there are more SockectChannel left in the iterator
      * @return <tt>true</tt> if there is at least one more
      * SockectChannel object to read
      */
     public boolean hasNext() {
         return iterator.hasNext();
     }

     /**
      * Get the next SocketChannel in the operator we have built from
      * the selected-key et for this selector.
      * 
      * @return The next SocketChannel in the iterator
      */
     public ServerSocketChannel next() {
         SelectionKey key = iterator.next();

         if (key.isValid() && key.isAcceptable()) {
             return (ServerSocketChannel) key.channel();
         }

         return null;
     }

     /**
      * Remove the current SocketChannel from the iterator
      */
     public void remove() {
         iterator.remove();
     }
}

总结:
socket监听NioSocketAcceptor,有两个内部变量为选择器selector和选择器提供者selectorProvider。init方法主要工作为打开一个选择器selector。打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则
通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。选择唤醒操作实际委托给内部选择器。


附:
//SocketAcceptor
/**
 * {@link IoAcceptor} for socket transport (TCP/IP).  This class
 * handles incoming TCP/IP based socket connections.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface SocketAcceptor extends IoAcceptor {
    /**
     * @return the local InetSocketAddress which is bound currently.  If more than one
     * address are bound, only one of them will be returned, but it's not
     * necessarily the firstly bound address.
     * This method overrides the {@link IoAcceptor#getLocalAddress()} method.
     */
    @Override
    InetSocketAddress getLocalAddress();

    /**
     * @return a {@link Set} of the local InetSocketAddress which are bound currently.
     * This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
     */
    @Override
    InetSocketAddress getDefaultLocalAddress();

    /**
     * Sets the default local InetSocketAddress to bind when no argument is specified in
     * {@link #bind()} method. Please note that the default will not be used
     * if any local InetSocketAddress is specified.
     * This method overrides the {@link IoAcceptor#setDefaultLocalAddress(java.net.SocketAddress)} method.
     * 
     * @param localAddress The local address
     */
    void setDefaultLocalAddress(InetSocketAddress localAddress);

    /**
     * @see ServerSocket#getReuseAddress()
     * 
     * @return <tt>true</tt> if the <tt>SO_REUSEADDR</tt> is enabled
     */
    boolean isReuseAddress();

    /**
     * @see ServerSocket#setReuseAddress(boolean)
     * 
     * @param reuseAddress tells if the <tt>SO_REUSEADDR</tt> is to be enabled
     */
    void setReuseAddress(boolean reuseAddress);

    /**
     * @return the size of the backlog.
     */
    int getBacklog();

    /**
     * Sets the size of the backlog.  This can only be done when this
     * class is not bound
     * 
     * @param backlog The backlog's size
     */
    void setBacklog(int backlog);

    /**
     * @return the default configuration of the new SocketSessions created by 
     * this acceptor service.
     */
    @Override
    SocketSessionConfig getSessionConfig();
}