一、简介

Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,

你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。

如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。

  

二、使用

ActiveMQ从4.x版本起开始支持Exclusive Consumer。 Broker会从多个consumers中挑选一个consumer来处理queue中

所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。 

可以通过DestinationOptions 来创建一个Exclusive Consumer,如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");consumer = session.createConsumer(queue);

还可以给consumer设置优先级,以便针对网络情况进行优化,如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

三、代码测试

发送:

public void test5() throws Exception {    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");    Connection connection = connectionFactory.createConnection();    connection.start();    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);    Destination destination = session.createQueue("test-queue2");    MessageProducer producer = session.createProducer(destination);    for (int i = 0; i < 3; i++) {        TextMessage message = session.createTextMessage("message Exclusive --" + i);	producer.send(message);    }    session.commit();    session.close();    connection.close();}

接收:

public void test5() throws Exception {    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");		    Connection connection = cf.createConnection();    connection.start();    		    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);    Destination queue = session.createQueue("test-queue2");                           // 对应效果1    //  Destination queue = session.createQueue("test-queue2?consumer.exclusive=true");   // 对应效果2		    for (int i = 0; i < 2; i++) {    	MessageConsumer consumer = session.createConsumer(queue);	consumer.setMessageListener(new MessageListener() {	    @Override	    public void onMessage(Message message) {		TextMessage msg = (TextMessage) message;	        try {		    System.out.println(consumer + "收到消息:" + msg.getText());		    session.commit();		} catch (Exception e) {		    e.printStackTrace();		}	    }	});    }}

效果1:

效果2: