Donald_Draper 阅读(979) 评论(0)
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
在前文中我们讲过PTP和PUB/SUB模式实例,今天我们来看一下ActiveMQ是如何生产消息的
实例主要生产者代码片段:
ConnectionFactory :连接工厂,JMS 用它创建连接 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  

Connection :JMS 客户端到JMS Provider 的连接 
Connection connection = connectionFactory.createConnection();
 
Connection 启动 
connection.start();  

System.out.println("Connection is start..."); 
Session: 一个发送或接收消息的线程 
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
 
Queue :消息的目的地;消息发送给谁. 
Queue  destination = session.createQueue(qname);  

MessageProducer:消息发送者
MessageProducer producer = session.createProducer(destination); 

设置持久化,此处学习,实际根据项目决定 
producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

构造消息,此处写死,项目就是参数,或者方法获取 
sendMessage(session, producer);  
session.commit();  
connection.close(); 

我们依次来看上面的连接工厂,连接,会话,消息队列和订阅主题,生产者及发送消息
1.连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); 
//ActiveMQConnectionFactory
 public class ActiveMQConnectionFactory extends JNDIBaseStorable
    implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable
{
private static final Logger LOG;
    private static final String DEFAULT_BROKER_HOST;//默认ip
    private static final int DEFAULT_BROKER_PORT;//默认端口
    public static final String DEFAULT_BROKER_BIND_URL;//broker默认绑定地址
    public static final String DEFAULT_BROKER_URL;//默认broker URL
    public static final String DEFAULT_USER = null;//默认用户
    public static final String DEFAULT_PASSWORD = null;//默认密码
    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;//默认生产者窗口大小
    protected URI brokerURL;//broker URL
    protected String userName;//用户名
    protected String password;//密码
    protected String clientID;//客户端id
    protected boolean dispatchAsync;
    protected boolean alwaysSessionAsync;
    JMSStatsImpl factoryStats;
    private IdGenerator clientIdGenerator;
    private String clientIDPrefix;
    private IdGenerator connectionIdGenerator;
    private String connectionIDPrefix;//连接id前缀
    private ActiveMQPrefetchPolicy prefetchPolicy;
    private RedeliveryPolicyMap redeliveryPolicyMap;
    private BlobTransferPolicy blobTransferPolicy;
    private MessageTransformer transformer;
    private boolean disableTimeStampsByDefault;
    private boolean optimizedMessageDispatch;
    private long optimizeAcknowledgeTimeOut;
    private long optimizedAckScheduledAckInterval;
    private boolean copyMessageOnSend;//是否拷贝消息
    private boolean useCompression;//是否压缩消息
    private boolean objectMessageSerializationDefered;
    private boolean useAsyncSend;//是否异步发送消息
    private boolean optimizeAcknowledge;
    private int closeTimeout;//关闭连接超时时间
    private boolean useRetroactiveConsumer;
    private boolean exclusiveConsumer;
    private boolean nestedMapAndListEnabled;
    private boolean alwaysSyncSend;//是否总是异步发送
    private boolean watchTopicAdvisories;
    private int producerWindowSize;//生产窗口大小
    private long warnAboutUnstartedConnectionTimeout;
    private int sendTimeout;
    private boolean sendAcksAsync;//是否发送异步ACK
    private TransportListener transportListener;// transport监听器
    private ExceptionListener exceptionListener;
    private int auditDepth;
    private int auditMaximumProducerNumber;
    private boolean useDedicatedTaskRunner;
    //当集群master宕机,重新选举master时,消费者等待重新消费的时间
    private long consumerFailoverRedeliveryWaitPeriod;
    private boolean checkForDuplicates;
    private ClientInternalExceptionListener clientInternalExceptionListener;//消费内部监听器
    private boolean messagePrioritySupported;//是否支持消息优先级
    private boolean transactedIndividualAck;
    private boolean nonBlockingRedelivery;//是否非阻塞传输
    private int maxThreadPoolSize;//最大线程池
    private TaskRunnerFactory sessionTaskRunner;//session任务工厂
    private RejectedExecutionHandler rejectedTaskHandler;
    protected int xaAckMode;
    private boolean rmIdFromConnectionId;
    private boolean consumerExpiryCheckEnabled;//是否检查消费者超时

    static 
    {
        LOG = LoggerFactory.getLogger(org/apache/activemq/ActiveMQConnectionFactory);
        String host = null;
        String port = null;
        try
        {
            host = (String)AccessController.doPrivileged(new PrivilegedAction() {

                public String run()
                {
                    String result = System.getProperty("org.apache.activemq.AMQ_HOST");
                    result = result != null && !result.isEmpty() ? result : System.getProperty("AMQ_HOST", "localhost");
                    return result;
                }

                public volatile Object run()
                {
                    return run();
                }

            });
            port = (String)AccessController.doPrivileged(new PrivilegedAction() {

                public String run()
                {
                    String result = System.getProperty("org.apache.activemq.AMQ_PORT");
                    result = result != null && !result.isEmpty() ? result : System.getProperty("AMQ_PORT", "61616");
                    return result;
                }

                public volatile Object run()
                {
                    return run();
                }

            });
        }
        catch(Throwable e)
        {
            LOG.debug("Failed to look up System properties for host and port", e);
        }
	//默认ip为localhost,端口为61616
        host = host != null && !host.isEmpty() ? host : "localhost";
        port = port != null && !port.isEmpty() ? port : "61616";
        DEFAULT_BROKER_HOST = host;
        DEFAULT_BROKER_PORT = Integer.parseInt(port);
        String defaultURL = (new StringBuilder()).append("tcp://").append(DEFAULT_BROKER_HOST).append(":").append(DEFAULT_BROKER_PORT).toString();
        String bindURL = null;
        try
        {
            bindURL = (String)AccessController.doPrivileged(new PrivilegedAction(defaultURL) {

                public String run()
                {
                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
                    result = result != null && !result.isEmpty() ? result : System.getProperty("BROKER_BIND_URL", defaultURL);
                    return result;
                }

                public volatile Object run()
                {
                    return run();
                }

                final String val$defaultURL;

            
            {
                defaultURL = s;
                super();
            }
            });
        }
        catch(Throwable e)
        {
            LOG.debug("Failed to look up System properties for host and port", e);
        }
	//默认url为tcp://localhost8:61616
        bindURL = bindURL != null && !bindURL.isEmpty() ? bindURL : defaultURL;
        DEFAULT_BROKER_BIND_URL = bindURL;
	//默认broke url为failover://tcp://localhost8:61616
        DEFAULT_BROKER_URL = (new StringBuilder()).append("failover://").append(DEFAULT_BROKER_BIND_URL).toString();
    }
 }

从ActiveMQConnectionFactory属性主要为broker url,用户密码,是否压缩、异步发送消息、
支持消息优先级、非阻塞传输,最大线程数,生产窗口大小等属性;
静态语句块,从系统获取url,port,broker url,如果系统中不存在相关属性,赋予默认值。


再来看ActiveMQConnectionFactory构造:
 public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
    {
        dispatchAsync = true;//默认异步分发
        alwaysSessionAsync = true;//异步发送消息
        factoryStats = new JMSStatsImpl();
        prefetchPolicy = new ActiveMQPrefetchPolicy();
        redeliveryPolicyMap = new RedeliveryPolicyMap();
        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
        blobTransferPolicy = new BlobTransferPolicy();
        optimizedMessageDispatch = true;
        optimizeAcknowledgeTimeOut = 300L;
        optimizedAckScheduledAckInterval = 0L;
        copyMessageOnSend = true;
        closeTimeout = 15000;//关闭超时时间
        nestedMapAndListEnabled = true;
        watchTopicAdvisories = true;
        producerWindowSize = 0;//消息窗口大小
        warnAboutUnstartedConnectionTimeout = 500L;
        sendTimeout = 0;//发送超时时间
        sendAcksAsync = true;//异步发送ACK
        auditDepth = 2048;
        auditMaximumProducerNumber = 64;
        consumerFailoverRedeliveryWaitPeriod = 0L;
        checkForDuplicates = true;
        messagePrioritySupported = false;//默认不支持消息优先级
        transactedIndividualAck = false;
        nonBlockingRedelivery = false;//默认为阻塞模式
        maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;//最大线程池大小
        rejectedTaskHandler = null;
        xaAckMode = -1;
        rmIdFromConnectionId = false;
        consumerExpiryCheckEnabled = true;
	//设置用户密码,broker url
        setUserName(userName);
        setPassword(password);
        setBrokerURL(brokerURL);
    }

从构造可以看出,主要初始化为broker url,用户密码,是否压缩、异步发送消息、
支持消息优先级、非阻塞传输,最大线程数,生产窗口大小等属性;

下面来看一下连接的获取
2.连接
Connection connection = connectionFactory.createConnection();  
//ActiveMQConnectionFactory
 public Connection createConnection()
        throws JMSException
    {
        return createActiveMQConnection();
    }
 protected ActiveMQConnection createActiveMQConnection()
        throws JMSException
    {
        return createActiveMQConnection(userName, password);
    }
    //最终ActiveMQ创建连接方法,返回的为ActiveMQConnection
     protected ActiveMQConnection createActiveMQConnection(String userName, String password)
        throws JMSException
    {
        ActiveMQConnection connection;
        if(brokerURL == null)
            throw new ConfigurationException("brokerURL not set.");
        connection = null;
	//创建transport
        Transport transport = createTransport();
	//创建连接
        connection = createActiveMQConnection(transport, factoryStats);
	//设置连接用户密码
        connection.setUserName(userName);
        connection.setPassword(password);
	//配置连接
        configureConnection(connection);
	//启动transport
        transport.start();
        if(clientID != null)
            connection.setDefaultClientID(clientID);
        return connection;
        catch(Throwable throwable1) { }
        throw JMSExceptionSupport.create((new StringBuilder()).append("Could not connect to broker URL: ").append(brokerURL).append(". Reason: ").append(e).toString(), e);
    }

我们分别来看以上几点
a.创建transport
Transport transport = createTransport();

protected Transport createTransport()
        throws JMSException
    {
       //由TransportFactory工厂连接brokerURL创建Transport
        return TransportFactory.connect(brokerURL);
        Exception e;
        e;
        throw JMSExceptionSupport.create((new StringBuilder()).append("Could not create Transport. Reason: ").append(e).toString(), e);
    }

再看TransportFactory
//TransportFactory
public abstract class TransportFactory
{
   private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
    //ConcurrentHashMap<BrokerURI.getSchema(),TransportFactory>,broker uri协议与TransportFactory的映射
    private static final ConcurrentMap TRANSPORT_FACTORYS = new ConcurrentHashMap();
    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
    private static final String THREAD_NAME_FILTER = "threadName";
   public static Transport connect(URI location)
        throws Exception
    {
       //跟Broker URI 获取TransportFactory
        TransportFactory tf = findTransportFactory(location);
	//获取连接
        return tf.doConnect(location);
    }
}

//从TRANSPORT_FACTORYS获取borkerURI对应的TransportFactory,没有则,
TRANSPORT_FACTORY_FINDER根据brokerURI创建对应的TransportFactory的实例放入TRANSPORT_FACTORYS中
public static TransportFactory findTransportFactory(URI location)
        throws IOException
    {
        String scheme = location.getScheme();
        if(scheme == null)
            throw new IOException((new StringBuilder()).append("Transport not scheme specified: [").append(location).append("]").toString());
        TransportFactory tf = (TransportFactory)TRANSPORT_FACTORYS.get(scheme);
        if(tf == null)
            try
            {
	        //TRANSPORT_FACTORY_FINDER根据协议名创建TransportFactory实例
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            }
            catch(Throwable e)
            {
                throw IOExceptionSupport.create((new StringBuilder()).append("Transport scheme NOT recognized: [").append(scheme).append("]").toString(), e);
            }
        return tf;
    }

从FactoryFinder,可以看到有一个META-INF/services/org/apache/activemq/transport/路径
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
找到这个路径,可以看到有一些文本文件http,nio,amqp,failover,ssl,tcp,vm等;
tcp这个就是我们找到,打开文件如下:
class=org.apache.activemq.transport.tcp.TcpTransportFactory
TcpTransportFactory看到这个想到了什么,就是我们要找的,FactoryFinder就是根据
BrokerURI的协议来找META-INF/services/org/apache/activemq/transport/下的对应文件,
然后加载class属性对应的类,由于我们的broker地址,为tcp://192.168.126.128:61616,所以协议为TCP
对应的为TcpTransportFactory,好了到这里TransportFactory工厂创建完毕。这里只是猜测
//回看一下FactoryFinder
public class FactoryFinder
{
    private static ObjectFactory objectFactory = new StandaloneObjectFactory();
    private final String path;
    //根据brokerUri协议创建TransportFactory实例
    public Object newInstance(String key)
        throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException
    {
        return objectFactory.create((new StringBuilder()).append(path).append(key).toString());
    }
    protected static class StandaloneObjectFactory
        implements ObjectFactory
    {
        //ConcurrentHashMap<BrokeURI.getSchema(),TransportFactory>
	final ConcurrentMap classMap = new ConcurrentHashMap();
        //根据broker URI协议创建TransportFactory
        public Object create(String path)
            throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException
        {
            Class clazz = (Class)classMap.get(path);
            if(clazz == null)
            {
	       
                clazz = loadClass(loadProperties(path));
                classMap.put(path, clazz);
            }
            return clazz.newInstance();
        }
        //加载class对应的TransportFactory
        public static Class loadClass(Properties properties)
            throws ClassNotFoundException, IOException
        {
            String className = properties.getProperty("class");
            if(className == null)
                throw new IOException("Expected property is missing: class");
            Class clazz = null;
            ClassLoader loader = Thread.currentThread().getContextClassLoader();
            if(loader != null)
                try
                {
                    clazz = loader.loadClass(className);
                }
                catch(ClassNotFoundException classnotfoundexception) { }
            if(clazz == null)
                clazz = org/apache/activemq/util/FactoryFinder.getClassLoader().loadClass(className);
            return clazz;
        }
         //加载属性文件
        public static Properties loadProperties(String uri)
            throws IOException
        {
            InputStream in;
            BufferedInputStream reader;
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            if(classLoader == null)
                classLoader = org/apache/activemq/util/FactoryFinder$StandaloneObjectFactory.getClassLoader();
            in = classLoader.getResourceAsStream(uri);
            if(in == null)
            {
                in = org/apache/activemq/util/FactoryFinder.getClassLoader().getResourceAsStream(uri);
                if(in == null)
                    throw new IOException((new StringBuilder()).append("Could not find factory class for resource: ").append(uri).toString());
            }
            reader = null;
            Properties properties1;
            reader = new BufferedInputStream(in);
            Properties properties = new Properties();
            properties.load(reader);
            properties1 = properties;
        }    
    }
}

再看从TransportFactory工厂获取连接:
//获取连接
return tf.doConnect(location);

TransportFactory获取连接
  public Transport doConnect(URI location)
        throws Exception
    {
        Transport rc;
        Map options = new HashMap(URISupport.parseParameters(location));
        if(!options.containsKey("wireFormat.host"))
            options.put("wireFormat.host", location.getHost());
        WireFormat wf = createWireFormat(options);
	创建transport
        Transport transport = createTransport(location, wf);
	配置transport
        rc = configure(transport, wf, options);
        return rc;
    }

    //待父类扩展
    protected Transport createTransport(URI location, WireFormat wf)
        throws MalformedURLException, UnknownHostException, IOException
    {
        throw new IOException("createTransport() method not implemented!");
    }

再看其父类
public class TcpTransportFactory extends TransportFactory
{
  protected Transport createTransport(URI location, WireFormat wf)
        throws UnknownHostException, IOException
    {
        URI localLocation = null;
        String path = location.getPath();
        if(path != null && path.length() > 0)
        {
            int localPortIndex = path.indexOf(':');
            try
            {
                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
                String localString = (new StringBuilder()).append(location.getScheme()).append(":/").append(path).toString();
                localLocation = new URI(localString);
            }
        }
	//创建socket的工场
        SocketFactory socketFactory = createSocketFactory();
	//创建TcpTransport
        return createTcpTransport(wf, socketFactory, location, localLocation);
    }
}

//创建socket的工场
SocketFactory socketFactory = createSocketFactory();
 protected SocketFactory createSocketFactory()
        throws IOException
    {
        return SocketFactory.getDefault();
    }
//DefaultSocketFactory
public abstract class SocketFactory
{
    public static SocketFactory getDefault()
    {
        synchronized(javax/net/SocketFactory)
        {
            if(theFactory == null)
                theFactory = new DefaultSocketFactory();
        }
        return theFactory;
    }

//创建TcpTransport,socketFactory为DefaultSocketFactory
return createTcpTransport(wf, socketFactory, location, localLocation);
 protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation)
        throws UnknownHostException, IOException
    {
        //返回的是TcpTransport
        return new TcpTransport(wf, socketFactory, location, localLocation);
    }
//TcpTransport
public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable
{
   protected final URI remoteLocation;//远程URI
    protected final URI localLocation;//本地URI
    protected final WireFormat wireFormat;
    protected int connectionTimeout;//连接超时时间
    protected int soTimeout;
    protected int socketBufferSize;//socket的缓存大小
    protected int ioBufferSize;
    protected boolean closeAsync;
    protected Socket socket;
    protected DataOutputStream dataOut;//socket的输出流
    protected DataInputStream dataIn;//socket的输入流
    protected TimeStampStream buffOut;
    protected int trafficClass;
    private boolean trafficClassSet;
    protected boolean diffServChosen;
    protected boolean typeOfServiceChosen;
    protected boolean trace;
    protected String logWriterName;
    protected boolean dynamicManagement;
    protected boolean startLogging;
    protected int jmxPort;
    protected boolean useLocalHost;
    protected int minmumWireFormatVersion;
    protected SocketFactory socketFactory;
    protected final AtomicReference stoppedLatch;
    protected volatile int receiveCounter;
    private Map socketOptions;
    private int soLinger;
    private Boolean keepAlive;//是否保活
    private Boolean tcpNoDelay;//tcp是否为非延时
    private Thread runnerThread;
    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation)
        throws UnknownHostException, IOException
    {
        connectionTimeout = 30000;
        socketBufferSize = 65536;
        ioBufferSize = 8192;
        closeAsync = true;
        buffOut = null;
        trafficClass = 0;
        trafficClassSet = false;
        diffServChosen = false;
        typeOfServiceChosen = false;
        trace = false;
        logWriterName = TransportLoggerSupport.defaultLogWriterName;
        dynamicManagement = false;
        startLogging = true;
        jmxPort = 1099;
        useLocalHost = false;
        stoppedLatch = new AtomicReference();
        soLinger = -2147483648;
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;
        try
        {
	    //默认SOCKET工厂创建socket
            socket = socketFactory.createSocket();
        }
        this.remoteLocation = remoteLocation;
        this.localLocation = localLocation;
        setDaemon(false);
    }
}

//DefaultSocketFactory
class DefaultSocketFactory extends SocketFactory
{
    DefaultSocketFactory()
    {
    }
    public Socket createSocket()
    {
        return new Socket();
    }
}

配置transport
rc = configure(transport, wf, options);
 public Transport configure(Transport transport, WireFormat wf, Map options)
        throws Exception
    {
        //配置写超时时间
        transport = compositeConfigure(transport, wf, options);
	//创建MutexTransport
        transport = new MutexTransport(transport);
	//创建ResponseCorrelator
        transport = new ResponseCorrelator(transport);
        return transport;
    }
    //配置写超时时间
 public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
    {
        if(options.containsKey("soWriteTimeout"))
        {
            transport = new WriteTimeoutFilter(transport);
            String soWriteTimeout = (String)options.remove("soWriteTimeout");
            if(soWriteTimeout != null)
                ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
        }
        IntrospectionSupport.setProperties(transport, options);
        return transport;
    }

//MutexTransport,添加锁机制
public class MutexTransport extends TransportFilter
{
    private final ReentrantLock writeLock;
    private boolean syncOnCommand;
    public MutexTransport(Transport next)
    {
        super(next);
        writeLock = new ReentrantLock();
        syncOnCommand = false;
    }
    public void onCommand(Object command)
    {
        if(!syncOnCommand)
            break MISSING_BLOCK_LABEL_47;
        writeLock.lock();
        transportListener.onCommand(command);
        writeLock.unlock();
        break MISSING_BLOCK_LABEL_57;
        Exception exception;
        exception;
        writeLock.unlock();
        throw exception;
        transportListener.onCommand(command);
    }
    ...
}
//ResponseCorrelator,请求协调器
public class ResponseCorrelator extends TransportFilter
{
   private final Map requestMap;
    private IntSequenceGenerator sequenceGenerator;
    private final boolean debug;
    private IOException error;
    public ResponseCorrelator(Transport next)
    {
        this(next, new IntSequenceGenerator());
    }
    public void oneway(Object o)
        throws IOException
    {
        Command command = (Command)o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        next.oneway(command);
    }

    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback)
        throws IOException
    {
       ...
    }

    public Object request(Object command)
        throws IOException
    {
        .....
    }


    public void onCommand(Object o)
    {
       ....
    }
}

小节一下创建transport:
TransportFactory创建Transport,从TransportFactory的ConcurrentHashMap<BrokerURI.getSchema(),TransportFactory>
获取brokerURI协议对应的TransportFactory,没有则根据brokerURI协议从FactoryFinder的获取,没有则加载,相应的实例。

b.创建连接

从上面看到transport为TransportFactory,factoryStats为JMSStatsImpl
先看一下JMSStatsImpl
public class JMSStatsImpl extends StatsImpl
{
    private List connections;
    public JMSStatsImpl()
    {
       //获取线程安全的connections集合
        connections = new CopyOnWriteArrayList();
    }
    public JMSConnectionStatsImpl[] getConnections()
    {
        Object connectionArray[] = connections.toArray();
        int size = connectionArray.length;
        JMSConnectionStatsImpl answer[] = new JMSConnectionStatsImpl[size];
        for(int i = 0; i < size; i++)
        {
            ActiveMQConnection connection = (ActiveMQConnection)connectionArray[i];
            answer[i] = connection.getConnectionStats();
        }

        return answer;
    }
    public void addConnection(ActiveMQConnection connection)
    {
        connections.add(connection);
    }
    public void removeConnection(ActiveMQConnection connection)
    {
        connections.remove(connection);
    }
    public void dump(IndentPrinter out)
    {
        out.printIndent();
        out.println("factory {");
        out.incrementIndent();
        JMSConnectionStatsImpl array[] = getConnections();
        for(int i = 0; i < array.length; i++)
        {
            JMSConnectionStatsImpl connectionStat = array[i];
            connectionStat.dump(out);
        }

        out.decrementIndent();
        out.printIndent();
        out.println("}");
        out.flush();
    }
    public void setEnabled(boolean enabled)
    {
        super.setEnabled(enabled);
        JMSConnectionStatsImpl stats[] = getConnections();
        int size = stats.length;
        for(int i = 0; i < size; i++)
            stats[i].setEnabled(enabled);

    }
   
}

从上JMSStatsImpl为管理连接的ActiveMQConnection
回到
connection = createActiveMQConnection(transport, factoryStats);

//ActiveMQConnectionFactory
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats)
        throws Exception
    {
        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
        return connection;
    }

//ActiveMQConnection
public class ActiveMQConnection
    implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection
{
    //默认用户密码brokerURL,线程池大小
    public static final String DEFAULT_USER;
    public static final String DEFAULT_PASSWORD;
    public static final String DEFAULT_BROKER_URL;
    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(org/apache/activemq/ActiveMQConnection);
    //TempDestinations
    public final ConcurrentMap activeTempDestinations = new ConcurrentHashMap();
    protected boolean dispatchAsync;//是否异步分发消息
    protected boolean alwaysSessionAsync;
    private TaskRunnerFactory sessionTaskRunner;//会话任务线程工厂
    private final ThreadPoolExecutor executor;//线程执行器
    private final ConnectionInfo info;//连接信息
    private ExceptionListener exceptionListener;
    private ClientInternalExceptionListener clientInternalExceptionListener;
    private boolean clientIDSet;
    private boolean isConnectionInfoSentToBroker;
    private boolean userSpecifiedClientID;
    private ActiveMQPrefetchPolicy prefetchPolicy;
    private BlobTransferPolicy blobTransferPolicy;
    private RedeliveryPolicyMap redeliveryPolicyMap;
    private MessageTransformer transformer;
    private boolean disableTimeStampsByDefault;
    private boolean optimizedMessageDispatch;
    private boolean copyMessageOnSend;
    private boolean useCompression;//是否压缩
    private boolean objectMessageSerializationDefered;
    private boolean useAsyncSend;//是否异步发送
    private boolean optimizeAcknowledge;
    private long optimizeAcknowledgeTimeOut;
    private long optimizedAckScheduledAckInterval;
    private boolean nestedMapAndListEnabled;
    private boolean useRetroactiveConsumer;
    private boolean exclusiveConsumer;
    private boolean alwaysSyncSend;
    private int closeTimeout;//连接关闭超时时间
    private boolean watchTopicAdvisories;
    private long warnAboutUnstartedConnectionTimeout;
    private int sendTimeout;//发送超时时间
    private boolean sendAcksAsync;//是否异步发送Acks
    private boolean checkForDuplicates;
    private boolean queueOnlyConnection;//是否为队列连接
    private boolean consumerExpiryCheckEnabled;
    private final Transport transport;//transport
    private final IdGenerator clientIdGenerator;
    private final JMSStatsImpl factoryStats;//连接状态管理器
    private final JMSConnectionStatsImpl stats;
    //启动动关闭状态AtomicBoolean
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean closing = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
    //会话,消费者连接,
    private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
    private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList();
    private final CopyOnWriteArrayList transportListeners = new CopyOnWriteArrayList();
    //dispatchers
    private final ConcurrentMap dispatchers = new ConcurrentHashMap();
    //生产者
    private final ConcurrentMap producers = new ConcurrentHashMap();
    //会话消费者id
    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
    private final SessionId connectionSessionId;
    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
    private AdvisoryConsumer advisoryConsumer;
    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
    //broker信息
    private BrokerInfo brokerInfo;
    private IOException firstFailureError;
    private int producerWindowSize;
    private final AtomicInteger protocolVersion = new AtomicInteger(11);
    private final long timeCreated = System.currentTimeMillis();
    private final ConnectionAudit connectionAudit = new ConnectionAudit();
    private DestinationSource destinationSource;
    private final Object ensureConnectionInfoSentMutex = new Object();
    private boolean useDedicatedTaskRunner;
    protected AtomicInteger transportInterruptionProcessingComplete;
    private long consumerFailoverRedeliveryWaitPeriod;
    //调度器
    private Scheduler scheduler;
    private boolean messagePrioritySupported;
    private boolean transactedIndividualAck;
    private boolean nonBlockingRedelivery;
    private boolean rmIdFromConnectionId;
    private int maxThreadPoolSize;
    private RejectedExecutionHandler rejectedTaskHandler;

    static 
    {
        DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
        DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
    }
}

再看起构造
   protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats)
        throws Exception
    {
        dispatchAsync = true;
        alwaysSessionAsync = true;
        prefetchPolicy = new ActiveMQPrefetchPolicy();
        optimizedMessageDispatch = true;
        copyMessageOnSend = true;
        optimizeAcknowledgeTimeOut = 0L;
        optimizedAckScheduledAckInterval = 0L;
        nestedMapAndListEnabled = true;
        closeTimeout = 15000;
        watchTopicAdvisories = true;
        warnAboutUnstartedConnectionTimeout = 500L;
        sendTimeout = 0;
        sendAcksAsync = true;
        checkForDuplicates = true;
        queueOnlyConnection = false;
        consumerExpiryCheckEnabled = true;
        producerWindowSize = 0;
        transportInterruptionProcessingComplete = new AtomicInteger(0);
        messagePrioritySupported = false;
        transactedIndividualAck = false;
        nonBlockingRedelivery = false;
        rmIdFromConnectionId = false;
        maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
        rejectedTaskHandler = null;
        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;
        executor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {

            public Thread newThread(Runnable r)
            {
                Thread thread = new Thread(r, (new StringBuilder()).append("ActiveMQ Connection Executor: ").append(transport).toString());
                return thread;
            }
            //ThreadPoolExecutor关联一个Transport和ActiveMQConnection
            final Transport val$transport;
            final ActiveMQConnection this$0;

            
            {
                this$0 = ActiveMQConnection.this;
                transport = transport1;
                super();
            }
        });
	//连接信息
        String uniqueId = connectionIdGenerator.generateId();
        info = new ConnectionInfo(new ConnectionId(uniqueId));
        info.setManageable(true);
        info.setFaultTolerant(transport.isFaultTolerant());
	//会话信息
        connectionSessionId = new SessionId(info.getConnectionId(), -1L);
        this.transport.setTransportListener(this);
	//将连接信息,添加到连接管理器
        stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }


c.设置连接用户密码
connection.setUserName(userName);
connection.setPassword(password);

 protected void setUserName(String userName)
    {
        info.setUserName(userName);
    }

public class ConnectionInfo extends BaseCommand
{
    public static final byte DATA_STRUCTURE_TYPE = 3;
    protected ConnectionId connectionId;
    protected String clientId;
    protected String clientIp;
    protected String userName;
    protected String password;
    protected BrokerId brokerPath[];
    protected boolean brokerMasterConnector;//是否为集群
    protected boolean manageable;
    protected boolean clientMaster;
    protected boolean faultTolerant;
    protected boolean failoverReconnect;
    protected transient Object transportContext;
}


d.配置连接
configureConnection(connection);

 protected void configureConnection(ActiveMQConnection connection)
        throws JMSException
    {
       //配置连接会话连接属性
        connection.setPrefetchPolicy(getPrefetchPolicy());
        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
        connection.setCopyMessageOnSend(isCopyMessageOnSend());
        connection.setUseCompression(isUseCompression());
        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
        connection.setDispatchAsync(isDispatchAsync());
        connection.setUseAsyncSend(isUseAsyncSend());
        connection.setAlwaysSyncSend(isAlwaysSyncSend());
        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
        connection.setExclusiveConsumer(isExclusiveConsumer());
        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
        connection.setTransformer(getTransformer());
        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
        connection.setProducerWindowSize(getProducerWindowSize());
        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
        connection.setSendTimeout(getSendTimeout());
        connection.setCloseTimeout(getCloseTimeout());
        connection.setSendAcksAsync(isSendAcksAsync());
        connection.setAuditDepth(getAuditDepth());
        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
        connection.setCheckForDuplicates(isCheckForDuplicates());
        connection.setMessagePrioritySupported(isMessagePrioritySupported());
        connection.setTransactedIndividualAck(isTransactedIndividualAck());
        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
        connection.setSessionTaskRunner(getSessionTaskRunner());
        connection.setRejectedTaskHandler(getRejectedTaskHandler());
        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
        if(transportListener != null)
            connection.addTransportListener(transportListener);
        if(exceptionListener != null)
            connection.setExceptionListener(exceptionListener);
        if(clientInternalExceptionListener != null)
            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
    }

e.启动transport
transport.start();

//启动TcpTransport线程
public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable{
public void run()
    {
        LOG.trace((new StringBuilder()).append("TCP consumer thread for ").append(this).append(" starting").toString());
        runnerThread = Thread.currentThread();
        for(; !isStopped(); doRun());
    }
    protected void doRun()
        throws IOException
    {
        try
        {   
	    //消费命令
            Object command = readCommand();
	    //开始消费
            doConsume(command);
        }
    }


而doConsume在TcpTransport中没有,TransportThreadSupport也没有,看TransportSupport
public abstract class TransportThreadSupport extends TransportSupport
    implements Runnable
//TransportSupport
public abstract class TransportSupport extends ServiceSupport
    implements Transport
{
    TransportListener transportListener;//消息监听器
    public void doConsume(Object command)
    {
        if(command != null)
            if(transportListener != null)
	        //启动监听器,监听消息,如果消息监听器不为空,
                transportListener.onCommand(command);
            else
                LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString());
    }
}

//service辅助类
public abstract class ServiceSupport
    implements Service
{
    private AtomicBoolean started;
    private AtomicBoolean stopping;
    private AtomicBoolean stopped;
    private List serviceListeners;
      public ServiceSupport()
    {
        started = new AtomicBoolean(false);
        stopping = new AtomicBoolean(false);
        stopped = new AtomicBoolean(false);
	//新建线程安全的服务监听器
        serviceListeners = new CopyOnWriteArrayList();
    }
    //终于找这个了,因为在TcpTransPort中看到dostart(),就不知道怎么调的,
    //ActiveMQ的Service和Tomcat的lifecyle是一回事,管理组件生命周期
    public void start()
        throws Exception
    {
        boolean success;
        if(!started.compareAndSet(false, true))
            break MISSING_BLOCK_LABEL_93;
        success = false;
        stopped.set(false);
        preStart();
	//调用dostart而,doStart为抽象函数待父类扩展
        doStart();
        success = true;
        started.set(success);
        //启动service监听器
        for(Iterator i$ = serviceListeners.iterator(); i$.hasNext(); l.started(this))
            l = (ServiceListener)i$.next();

    }
 }
 //待父类扩展
 protected abstract void doStart()
        throws Exception;
}


看TcpTransport
public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable
{
    protected void doStart()
        throws Exception
    {
        //连接
        connect();
        stoppedLatch.set(new CountDownLatch(1));
        //启动TransportThreadSupport线程
        super.doStart();
    }
}

来看
//连接
connect();

protected void connect()
        throws Exception
    {
        if(socket == null && socketFactory == null)
            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
        InetSocketAddress localAddress = null;
        InetSocketAddress remoteAddress = null;
        if(localLocation != null)
	    //如果是localhost地址,则创建InetSocketAddress
            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
        if(remoteLocation != null)
        {
	    //如果是ip地址,则根据ip创建InetSocketAddress
            String host = resolveHostName(remoteLocation.getHost());
            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
        }
        trafficClassSet = setTrafficClass(socket);
        if(socket != null)
        {
            if(localAddress != null)
                socket.bind(localAddress);
            if(remoteAddress != null)
                if(connectionTimeout >= 0)
		    //socket连接broker
                    socket.connect(remoteAddress, connectionTimeout);
                else
                    socket.connect(remoteAddress);
        } else
        if(localAddress != null)
            socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
        else
            socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
        //初始化socket
	initialiseSocket(socket);
	//初始化输入输出流
        initializeStreams();
    }

初始化socket
initialiseSocket(socket);

protected void initialiseSocket(Socket sock)
        throws SocketException, IllegalArgumentException
    {
        if(socketOptions != null)
        {
            Map copy = new HashMap(socketOptions);
            IntrospectionSupport.setProperties(socket, copy);
            if(!copy.isEmpty())
                throw new IllegalArgumentException((new StringBuilder()).append("Invalid socket parameters: ").append(copy).toString());
        }
        try
        {
	    //设置socket接收与发送缓存区大小
            sock.setReceiveBufferSize(socketBufferSize);
            sock.setSendBufferSize(socketBufferSize);
        }
        catch(SocketException se)
        {
            LOG.warn((new StringBuilder()).append("Cannot set socket buffer size = ").append(socketBufferSize).toString());
            LOG.debug((new StringBuilder()).append("Cannot set socket buffer size. Reason: ").append(se.getMessage()).append(". This exception is ignored.").toString(), se);
        }
	//设置socket写超时时间
        sock.setSoTimeout(soTimeout);
        if(keepAlive != null)
	    //设置socket保活
            sock.setKeepAlive(keepAlive.booleanValue());
        if(soLinger > -1)
            sock.setSoLinger(true, soLinger);
        else
        if(soLinger == -1)
            sock.setSoLinger(false, 0);
        if(tcpNoDelay != null)
	    //设置socket是否有延时
            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
        if(!trafficClassSet)
            trafficClassSet = setTrafficClass(sock);
    }

初始化输入输出流
initializeStreams();

 protected void initializeStreams()
        throws Exception
    {
        //创建Tcp缓存区输入流
        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {

            public int read()
                throws IOException
            {
                receiveCounter++;
                return super.read();
            }

            public int read(byte b[], int off, int len)
                throws IOException
            {
                receiveCounter++;
                return super.read(b, off, len);
            }

            public long skip(long n)
                throws IOException
            {
                receiveCounter++;
                return super.skip(n);
            }

            protected void fill()
                throws IOException
            {
                receiveCounter++;
                super.fill();
            }

            final TcpTransport this$0;

            
            {
                this$0 = TcpTransport.this;
                super(x0, x1);
            }
        };
	//创建数据输入流
        dataIn = new DataInputStream(buffIn);
	//创建tcp输出流
        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
        //创建数据输出流
	dataOut = new DataOutputStream(outputStream);
        buffOut = outputStream;
    }

再看TransportThreadSupport
//TransportThreadSupport
public abstract class TransportThreadSupport extends TransportSupport
    implements Runnable
{
    private boolean daemon;
    private Thread runner;
    private long stackSize;
    //启动TransportThread线程
    protected void doStart()
        throws Exception
    {
        runner = new Thread(null, this, (new StringBuilder()).append("ActiveMQ Transport: ").append(toString()).toString(), stackSize);
        runner.setDaemon(daemon);
        runner.start();
    }
  
}

从上可以看出启动transport,就是启动transport监听器和Service监听器,同时初始化Socket及
transport的数据输入DataInputStream,输出流DataOutputStream

再来看
启动连接
connection.start(); 
 public void start()
        throws JMSException
    {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if(started.compareAndSet(false, true))
        {
            ActiveMQSession session;
	    //启动会话
            for(Iterator i = sessions.iterator(); i.hasNext(); session.start())
                session = (ActiveMQSession)i.next();
        }
    }

今天现将这么多剩下的下一篇来讲,总结一下:
ActiveMQConnectionFactory的创建过程,主要为初始化为broker url,用户密码,是否压缩、异步发送消息、支持消息优先级、非阻塞传输,最大线程数,生产窗口大小等属性;从ActiveMQConnectionFactory创建连接,首先通过TcpTransportFacotory创建TcpTransport,然后保证成待锁机制的MutexTransport,最后包装成
ResponseCorrelator;根据TcpTransport和ActiveMQConnection状态管理器JMSStatsImpl创建ActiveMQConnection,创建ActiveMQConnection过程中,主要是是否异步分发消息,线程执行器,连接状态管理器,调度器等;然后设置连接用户密码通过ConnectionInfo,配置是否支持消息优先级、非阻塞传输,最大线程数,生产窗口大小,Transport监听器transportListener;
最后启动TcpTransport和Connection,启动TcpTransport主要是初始化socket,
ip,端口,输入输出缓存区,输入输出流DataI/OnputStream,启动连接主要启动会话ActiveMQSession,这个我们在后面再看,以及3,4,5,6。



3.会话
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); 

4.消息队列和订阅主题
Queue  destination = session.createQueue(qname);
 Topic  destination = session.createTopic(tname)

5.创建生产者
MessageProducer producer = session.createProducer(destination);  
producer.setDeliveryMode(DeliveryMode.PERSISTENT);  


6.发送消息
sendMessage(session, producer); 
Queue
 public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  
Topic
 /**
    * 
    * @param session
    * @param producer
    * @throws Exception
    */
   public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       Order order = new Order();
       order.setId(1);
       order.setAmount(150.62);
       order.setGoodsId(15);
       order.setGoodsAmount(2);
       order.setShopId(5656);
       //我们也可以将Object转换为Json String,作为TextMessage来传送,在消费再反Json String 为Obejct
       ObjectMessage orderMess = session.createObjectMessage(order);
       System.out.println("向ActiveMq:"+tname+"发送订单信息:" + "ActiveMq 发送的Topic消息"); 
       producer.send(orderMess); 
   }  






//ConnectionFactory
package javax.jms;
public interface ConnectionFactory
{

    public abstract Connection createConnection()
        throws JMSException;

    public abstract Connection createConnection(String s, String s1)
        throws JMSException;
}

//QueueConnectionFactory
package javax.jms;
public interface QueueConnectionFactory
    extends ConnectionFactory
{

    public abstract QueueConnection createQueueConnection()
        throws JMSException;

    public abstract QueueConnection createQueueConnection(String s, String s1)
        throws JMSException;
}

//TopicConnectionFactory
package javax.jms;

public interface TopicConnectionFactory
    extends ConnectionFactory
{

    public abstract TopicConnection createTopicConnection()
        throws JMSException;

    public abstract TopicConnection createTopicConnection(String s, String s1)
        throws JMSException;
}

//IdGenerator 客户端,连接ID产生器
public class IdGenerator
{  
    private static final String UNIQUE_STUB;
    private static int instanceCount;
    private static String hostName;
    private String seed;
    private final AtomicLong sequence;
    private int length;
    public static final String PROPERTY_IDGENERATOR_HOSTNAME = "activemq.idgenerator.hostname";
    public static final String PROPERTY_IDGENERATOR_LOCALPORT = "activemq.idgenerator.localport";
    public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port";

    public IdGenerator(String prefix)
    {
        sequence = new AtomicLong(1L);
        synchronized(UNIQUE_STUB)
        {
            seed = (new StringBuilder()).append(prefix).append(UNIQUE_STUB).append(instanceCount++).append(":").toString();
            length = seed.length() + "9223372036854775807".length();
        }
    }
      public synchronized String generateId()
    {
        StringBuilder sb = new StringBuilder(length);
        sb.append(seed);
        sb.append(sequence.getAndIncrement());
        return sb.toString();
    }
}

//QueueConnection
public interface QueueConnection
    extends Connection
{

    public abstract QueueSession createQueueSession(boolean flag, int i)
        throws JMSException;

    public abstract ConnectionConsumer createConnectionConsumer(Queue queue, String s, ServerSessionPool serversessionpool, int i)
        throws JMSException;
}


//TopicConnection
 public interface TopicConnection
    extends Connection
{

    public abstract TopicSession createTopicSession(boolean flag, int i)
        throws JMSException;

    public abstract ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serversessionpool, int i)
        throws JMSException;

    public abstract ConnectionConsumer createDurableConnectionConsumer(Topic topic, String s, String s1, ServerSessionPool serversessionpool, int i)
        throws JMSException;
}


//Cloneable
package java.lang;

/**
 * A class implements the <code>Cloneable</code> interface to
 * indicate to the {@link java.lang.Object#clone()} method that it
 * is legal for that method to make a
 * field-for-field copy of instances of that class.
 * <p>
 * Invoking Object's clone method on an instance that does not implement the
 * <code>Cloneable</code> interface results in the exception
 * <code>CloneNotSupportedException</code> being thrown.
 * <p>
 * By convention, classes that implement this interface should override
 * <tt>Object.clone</tt> (which is protected) with a public method.
 * See {@link java.lang.Object#clone()} for details on overriding this
 * method.
 * <p>
 * Note that this interface does <i>not</i> contain the <tt>clone</tt> method.
 * Therefore, it is not possible to clone an object merely by virtue of the
 * fact that it implements this interface.  Even if the clone method is invoked
 * reflectively, there is no guarantee that it will succeed.
 *
 * @author  unascribed
 * @see     java.lang.CloneNotSupportedException
 * @see     java.lang.Object#clone()
 * @since   JDK1.0
 */
public interface Cloneable {
}