yunlian0621 阅读(70) 评论(0)

java代码:

 

package com.yanzhi.system;

import com.yanzhi.tools.C;
import com.yanzhi.tools.Global;
import com.yanzhi.tools.StringUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.SingleConnectionFactory;

import javax.jms.*;

import java.sql.Timestamp;
import java.util.List;

/**
 * Created by xiaoyunlian on 2016/2/14.
 * activemq消费者实例
 */
public class ConsumerApp implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class);

    public static void start(){
       try {
           //连接
           ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory();
           targetConnectionFactory.setBrokerURL(Global.getBrokerURL());
           targetConnectionFactory.setTrustAllPackages(true);
           SingleConnectionFactory connectionFactory = new SingleConnectionFactory();
           connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接
           Connection connection = connectionFactory.createConnection();
           connection.start();
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           //队列
           ActiveMQQueue yanzhiQueueDestination = (ActiveMQQueue) session.createQueue("pkQuene,faceValueReportQuene,regUserQuene");

           ConsumerApp consumerMessageListener = new ConsumerApp();

           MessageConsumer consumer = session.createConsumer(yanzhiQueueDestination);
           consumer.setMessageListener(consumerMessageListener);
           System.err.println("~~~~~~~~~~~~~~~~~~~~~~ active MQ 消费者监听器 启动成功~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
       }catch (Exception e){
           e.printStackTrace();
       }

    }

    @Override
    public void onMessage(Message msg) {
        try {
            Destination dest = msg.getJMSDestination();
            if (msg instanceof TextMessage) {
                ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)msg;
                String name = activeMQTextMessage.getDestination().getPhysicalName();
                TextMessage message = (TextMessage) msg;
                System.err.println("队列:"+name+"接收者接到一个String消息:"+message.getText());
            } else if (msg instanceof MapMessage) {
                ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) msg;
                String destinationName = activeMQMapMessage.getDestination().getPhysicalName();
            } else if (msg instanceof StreamMessage) {
                StreamMessage message = (StreamMessage) msg;
                System.out.println("------Received StreamMessage------");
                System.out.println(message.readString());
                System.out.println(message.readBoolean());
                System.out.println(message.readLong());
            } else if (msg instanceof ObjectMessage) {
                ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)msg;
                String destinationName = activeMQObjectMessage.getDestination().getPhysicalName();
                if (Global.getPkQuene().equals(destinationName)){
                    ObjectMessage objectMessage = (ObjectMessage) msg;
                    Object object = objectMessage.getObject();
                    if (object instanceof List){
                        // do something
                    }
                    
                }
                if (Global.getFaceValueReportQuene().equals(destinationName)){
                    // do something
                }
                if (Global.getRegUserQuene().equals(destinationName)){
                    // do something
                }
            } else if (msg instanceof BytesMessage) {
                System.out.println("------Received BytesMessage------");
                BytesMessage message = (BytesMessage) msg;
                byte[] byteContent = new byte[1024];
                int length = -1;
                StringBuffer content = new StringBuffer();
                while ((length = message.readBytes(byteContent)) != -1) {
                    content.append(new String(byteContent, 0, length));
                }
                System.out.println(content.toString());
            } else {
                System.out.println(msg);
            }
        } catch (JMSException e) {
            LOGGER.error("error {}", e);
        }
    }
}

调用上面的start()方法,即可启动消息队列的消费者。