一、简介
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,
你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。
如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。
二、使用
ActiveMQ从4.x版本起开始支持Exclusive Consumer。 Broker会从多个consumers中挑选一个consumer来处理queue中
所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。
可以通过DestinationOptions 来创建一个Exclusive Consumer,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");consumer = session.createConsumer(queue);
还可以给consumer设置优先级,以便针对网络情况进行优化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");
三、代码测试
发送:
public void test5() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test-queue2"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message Exclusive --" + i); producer.send(message); } session.commit(); session.close(); connection.close();}
接收:
public void test5() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("test-queue2"); // 对应效果1 // Destination queue = session.createQueue("test-queue2?consumer.exclusive=true"); // 对应效果2 for (int i = 0; i < 2; i++) { MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println(consumer + "收到消息:" + msg.getText()); session.commit(); } catch (Exception e) { e.printStackTrace(); } } }); }}
效果1:
效果2: