IXHONG 阅读(442) 评论(0)

本文转载自http://shift-alt-ctrl.iteye.com/blog/1840411

 

Selector本身为抽象类,AbstractSelector是Selector类的抽象实现类,具体的实现类更加底层(SelectorImpl,位于sun.nio.ch);Selector即为"选择器",支撑了NIO的多路复用.Selector不能直接创建,需要通过Selector.open()获得,该方法将使用系统默认的选择器提供者创建新的选择器(SelectorProvider),可以通过选择器的close方法关闭它.

通过SelectionKey对象来表示可选择通道(SelectableChannel)到选择器的注册.SelectionKey由Selector维护.

通常一个Channel只会被注册到一个Selector,一个Selector可以“监测”多个Channel;事实上Channel可以注册在任意一个Selector上,ServerSocketChannel和SocketChannel可以共用一个Selector,也可以各自使用不同的。

 

一.Selector维护三种选择键:

  1.  keys:保存了所有已注册且没有cancel的选择键,Set类型.可以通过selector.keys()获取
  2. selectedKeys:已选择键集,即前一次操作期间,已经准备就绪的通道所对应的选择键.此集合为keys的子集.通过selector.selectedKeys()获取.
  3. canceledKeys:已取消键.已经被取消但尚未取消注册(deregister)的选择键.此集合不可被访问.为keys()的子集.

对于新的Selector实例,上述三个集合均为空.通过channel.register将导致keys()集合被添加.如果某个selectionKey.cancel()被调用,那么此key将会被添加到canceldKey集合中,在下一次selector选择期间,如果canceldKeys不为空,将会导致触发此key的deregister操作(释放资源,并从keys中移除).无论通过channel.close()还是通过selectionKey.cancel(),都会导致key被加入到cannceldKey中.

 

每次选择操作期间(select),都可以将key添加到selectedKeys中或者将从canceledKey中移除.负责"选择"key的操作为select(),selectNow(),select(long timeout);"选择操作"执行的步骤:

  1. 首先对cancelledKey进行清除,遍历"已取消键集合",并对每个key进行deregister操作,然后从"已取消键"集合中删除.从keys集合中删除,从"已选择键"中删除.
  2. 将由更底层的操作来查询操作系统中每个channel的 准备就绪信息.如果该通道的key尚未在selectedKeys中存在,则将其添加到该集合中.如果该通道的key已经存在selectedKeys中,则修改器opts事件集."选择"通道的就绪信息,在底层是由"可扩展尺寸"的线程池执行,但是在并发较高的IO操作中,这仍然会存在"select"延迟问题.
  3. "选择"操作结束后,再次执行1),已防止在"选择"期间,有些keys被取消.

select是个被同步的过程(将对keys,selectedKey都会同步).多线程调用会被阻塞.

二.并发性:

Selector可以在多线程环境中使用,但是其各种键集合并非是线程安全的.

Selector本身对各种key集合的操作,都是同步的,当然为了避免死锁问题,其同步的顺序也是一致的.比如在执行select操作其他,其他线程register,将会阻塞.可以在任意时刻关闭通道或者取消键.因为select操作并未对cancelledKey同步,因此有可能再selectedKey中出现的key是已经被取消的.这一点需要注意.需要校验:key.isValid() && key.isReadable()...

 

三.select()和select(long)是阻塞操作:

  • 通过selector.wakeup()可以将其返回
  • 通过调用selector.close()
  • 调用已阻塞线程的interrupt方法.

注意selector.close()方法也是对keys和selectedKeys进行了同步.一般情况下,keys()和selectedKeys在多线程环境中使用是不安全的.

事实上selector.keys()返回的结果是一个Collections.unmodifiableSet(keys),其中keys参数为selector内部维持的集合.由此可见keys集合是只读的.

selector.selectedKeys()返回的结果是一个Util.ungrowableSet(selectedKeys),其中参数为selector内部维持的集合.由此可见selectedKeys是无法被外部进行add操作的,

但是可以remove以及进行iterator操作.

(参见sun.nio.ch.SelectorImpl,源码来自http://javasourcecode.org/html/open-source/jdk/jdk-6u23/sun/nio/ch/SelectorImpl.java.html)

 

四.方法列表:

  •  public static Selector open() throws IOException:打开一个选择器,此操作将会导致系统默认的SelectorProvider对象的openSelector方法来创建选择器.(此后介绍在各个OS下provider特性)
  • public abstract boolean isOpen():检测此selector是否处于打开状态,仅当selector创建成功却没有关闭,返回true.
  • public abstract Selt<SelectionKey> keys():获取此selector中已经注册(可能已经cancelled但尚未deregister)的所有选择键.此集合不能被外部修改.
  • public abstract Set<SelectionKey> selectedKeys():返回此选择器当前已选择键集合.此集合不能被add,但可以remove操作.
  • public abstract int selectNow() throws IOException:"选择"操作,获取相应的通道已为I/O操作就绪.此方法为非阻塞操作,立即返回已经就绪的的个数.可能为0."非阻塞",意味着,在I/O通道就绪信息的检测上,是无阻塞的,即如果底层线程(底层实现为多线程轮询检测I/O操作,并将已就绪的key放在内部的队列中)所维持的"就绪通道"个数为任意数字都立即返回,当然包括0.因为同一个selector实例中,select(),selectNow(),select(long)底层的实现几乎一致,方法实体中都会对keys和selectedKeys进行同步操作,因此在多线程中同时进行"select"调用,也将会存在互相阻塞的影响.
  • public abstract int select(long timeout) throws IOException:选择一组键,此方法为处于阻塞模式的选择操作.尽在至少一个通道就绪/调用选择器的wakeup/当前线程被interrupt/超时后返回.
  • public abstract int select() throws Exception选择一组键,此方法为阻塞模式的选择操作,直到至少一个通道就绪/wakeup/interrupt时返回.注意Selector支持interrupt(即具有interruptable),它采取了和InterruptableChannel类似的设计思路.即具有begin()/end().
Java代码  
  1. //伪代码如下:  
  2. doSelect(){  
  3.     try{  
  4.         begin();//注册中断操作  
  5.         select();  
  6.     }finaly{  
  7.         end();//解除中断和检测中断.  
  8.     }  
  9. }  

 

在实际底层I/O选择时(阻塞行为),在方法开始前执行begin():

begin所做的事情,就是向Thread注册interruptable对象,参见AbstractSelector.begin();

Java代码  
  1. if (interruptor == null) {  
  2.    interruptor = new Interruptible() {  
  3.    public void interrupt() {  
  4.         AbstractSelector.this.wakeup();//中断回调,主要操作为让当前selector.wakeup(),即在阻塞中返回.  
  5.    }};  
  6. }  
  7. //注册thread.blocker,向Thread.currentThread()中Interruptable blocker属性赋值.以便在thread.interrupt()时能够执行回调.  
  8. AbstractInterruptibleChannel.blockedOn(interruptor);  
  9. //检测当前thread是否已经被终端.  
  10. if (Thread.currentThread().isInterrupted())  
  11.    interruptor.interrupt();  
  12. }  

 

Java代码  
  1. //Thread.interrupt()方法对异步中断的相应操作  
  2. public void interrupt() {  
  3.     if (this != Thread.currentThread())  
  4.         checkAccess();  
  5.   
  6.     synchronized (blockerLock) {  
  7.         Interruptible b = blocker;  
  8.         if (b != null) {  
  9.         interrupt0();       // Just to set the interrupt flag  
  10.         b.interrupt();  
  11.         return;  
  12.         }  
  13.     }  
  14.     interrupt0();  
  15. }  

 

 在实际IO选择返回后,以及异常将会执行end(),end()方法要比Channel的控制简单,直接取消blocker = null.

 现在,你知道Selector中断和唤醒的机制了吗?

  • public abstract Selector wakeup():使尚未返回的选择操作立即返回.如果另一个线程目前正阻塞在select()或者select(long)方法的调用中,则该调用将立即返回.返回后,再次调用select等,将会有可能继续阻塞.多次连续的调用wakeup,效果一致.执行wakeup,将会导致底层Selector实现类的实例的interrupt属性标记为true,然后由JNI调用触发pipe被打断并发回(pipe为selector内部机制,稍后介绍),此后所有在"select"操作上阻塞的线程,依次获取keys和selectedKeys锁,此后将对interrupt属性校验,如果interrupt = true,将会重置pipe(自己给自己建立一个pipe-socket链接),然后再次将interrupt置为false,并从阻塞中返回.由此可见,如果多个线程阻塞,事实上wakeup只能让正在阻塞的一个线程返回.阻塞在select操作上,有2中情况:1)因为keys集合同步阻塞 2) 因为selector IO阻塞...wakeup()方法是让基于pipe IO阻塞的操作返回.但是因为keys同步锁的阻塞它将无能为力.wakeup是一种对底层操作消耗较为严重的操作,需要对此操作的调用频度有所关心.
  • public abstract void close() throws Exception:关闭选择器.如果有其他线程阻塞在此selector的选择操作中,则中断该线程.close()方法内部执行顺序为:
  1. 调用wakeup()方法,使阻塞的线程立即返回.
  2. 关闭selector所关联的底层pipe连接信息.
  3. 对keys和selectedKeys同步,然后遍历此selector所注册是所有的channel(即选择键,底层而言每个channel对应一个选择键),并依次执行selector.degregister(key),deregister操作将导致每个channel都删除自己维持的当前selector引用(从内部的key[]数组中删除).如果当前channel已经关闭,则直接销毁当前channel所关联的所有资源(比如关闭打开的文件描述),如果当前channel为open,则保持资源.
  4. 退出所有的底层"select"事件线程池.

 

五.SelectionKey(选择键)

抽象类,表示selectableChannel在Selector中注册的标识.每个Channel向Selector注册时,都将会创建一个selectionKey.选择键将Channel与Selector建立了关系,并维护了channel事件.可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它.所以在调用某个key时,需要使用isValid进行校验.

选择键包含两个操作集,操作集为位运算值,每一位表示一种操作.

  1. interest 集合:当前channel感兴趣的操作,此类操作将会在下一次选择器select操作时被交付,可以通过selectionKey.interestOps(int)进行修改.
  2. ready 集合:表示此选择键上,已经就绪的操作.每次select时,选择器都会对ready集合进行更新;外部程序无法修改此集合.

操作集:

OP_ACCEPT:连接可接受操作,仅ServerSocketChannel支持

OP_CONNECT:连接操作,Client端支持的一种操作

OP_READ/OP_WRITE

 

这些opts都不为0,如果向selector之中register一个为“0”的opts,表示此channel不关注任何类型的事件。(言外之意,register方法只是获取一个selectionKey,具体这个Channel对何种事件感兴趣,可以在稍后操作)

 

方法列表:

  • public abstract SelectableChannel channel():返回此选择键所关联的通道.即使此key已经被取消,仍然会返回.
  • public abstract Selector selector():返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回.
  • public abstract boolean isValid():检测此key是否有效.当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效.在AbstractSelector.removeKey(key)中,会导致selectionKey被置为无效.
  • public abstract void cancel():请求将此键取消注册.一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中.cancel操作将key的valid属性置为false,并执行selector.cancel(key)(即将key加入cancelledkey集合)
  • public abstract int interesOps():获得此键的interes集合.
  • public abstract SelectionKey interestOps(int ops):将此键的interst设置为指定值.此操作会对ops和channel.validOps进行校验.如果此ops不会当前channel支持,将抛出异常.
  • public abstract int readyOps():获取此键上ready操作集合.即在当前通道上已经就绪的事件.
  • public final boolean isReadable(): 检测此键是否为"read"事件.等效于:k.,readyOps() & OP_READ != 0;还有isWritable(),isConnectable(),isAcceptable()
  • public final Object attach(Object ob):将给定的对象作为附件添加到此key上.在key有效期间,附件可以在多个ops事件中传递.
  • public final Object attachment():获取附件.一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期中共享,但是attachment数据不会作为socket数据在网络中传输.