From e70ee3e8bff300dff233829741d2ddd2cb89b117 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 23 Apr 2007 15:54:15 +0000 Subject: QPID-436 - topic exchange doesn't obey the mandatory flag git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@531513 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/DestWildExchange.java | 47 +++---- .../ReturnUnroutableMandatoryMessageTest.java | 140 ++++++++++++++++++++- 2 files changed, 162 insertions(+), 25 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 8a50e93bf9..605a4bcb61 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -57,19 +57,16 @@ public class DestWildExchange extends AbstractExchange private ConcurrentHashMap> _routingKey2queues = new ConcurrentHashMap>(); - /** - * DestWildExchangeMBean class implements the management interface for the - * Topic exchanges. - */ + /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") private final class DestWildExchangeMBean extends ExchangeMBean { // open mbean data types for representing exchange bindings - private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; - private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; private OpenType[] _bindingItemTypes = new OpenType[2]; - private CompositeType _bindingDataType = null; - private TabularType _bindinglistDataType = null; + private CompositeType _bindingDataType = null; + private TabularType _bindinglistDataType = null; private TabularDataSupport _bindingList = null; @MBeanConstructor("Creates an MBean for AMQ topic exchange") @@ -80,22 +77,18 @@ public class DestWildExchange extends AbstractExchange init(); } - /** - * initialises the OpenType objects. - */ + /** initialises the OpenType objects. */ private void init() throws OpenDataException { _bindingItemTypes[0] = SimpleType.STRING; _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", - _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindingItemNames, _bindingItemNames, _bindingItemTypes); _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), - _bindingDataType, _bindingItemIndexNames); + _bindingDataType, _bindingItemIndexNames); } - /** - * returns exchange bindings in tabular form - */ + /** returns exchange bindings in tabular form */ public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); @@ -122,7 +115,9 @@ public class DestWildExchange extends AbstractExchange { AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -159,7 +154,7 @@ public class DestWildExchange extends AbstractExchange { queueList.add(queue); } - else if(_logger.isDebugEnabled()) + else if (_logger.isDebugEnabled()) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -176,10 +171,18 @@ public class DestWildExchange extends AbstractExchange // TODO: add support for the immediate flag if (queues == null) { - _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); - //todo Check for valid topic - mritchie - return; + if (info.isMandatory()) + { + String msg = "Topic " + routingKey + " is not known to " + this; + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn("No queues found for routing key " + routingKey); + _logger.warn("Routing map contains: " + _routingKey2queues); + //todo Check for valid topic - mritchie + return; + } } for (AMQQueue q : queues) @@ -245,7 +248,7 @@ public class DestWildExchange extends AbstractExchange } } - protected ExchangeMBean createMBean() throws AMQException + protected ExchangeMBean createMBean() throws AMQException { try { diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 87491ed3d3..ca352b2fd7 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -22,6 +22,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class); private final List _bouncedMessageList = Collections.synchronizedList(new ArrayList()); + private static final String VIRTUALHOST = "test"; + private static final String BROKER = "vm://:1"; static { @@ -53,10 +55,10 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex * * @throws Exception */ - public void testReturnUnroutableMandatoryMessage() throws Exception + public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception { _bouncedMessageList.clear(); - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -70,7 +72,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); // This is the default now - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST); con2.setExceptionListener(this); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -127,6 +129,138 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex } + public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception + { + _bouncedMessageList.clear(); + Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST); + + + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + AMQQueue valid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE"); + AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE_INVALID"); + MessageConsumer consumer = consumerSession.createConsumer(valid_queue); + + //force synch to ensure the consumer has resulted in a bound queue + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now + + Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST); + + con2.setExceptionListener(this); + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + + MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false); + MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue); + + // First test - should be routed + _logger.info("Sending non-mandatory message"); + TextMessage msg1 = producerSession.createTextMessage("msg1"); + nonMandatoryProducer.send(msg1); + + // Second test - should be bounced + _logger.info("Sending non-routable mandatory message"); + TextMessage msg2 = producerSession.createTextMessage("msg2"); + mandatoryProducer.send(msg2); + + + _logger.info("Starting consumer connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(1000L); + + assertTrue("No message routed to receiver", tm != null); + assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText())); + + try + { + Thread.sleep(1000L); + } + catch (InterruptedException e) + { + ; + } + + assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1); + Message m = _bouncedMessageList.get(0); + assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2")); + + + con.close(); + con2.close(); + } + + + public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception + { + _bouncedMessageList.clear(); + Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST); + + + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC"); + AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid"); + MessageConsumer consumer = consumerSession.createConsumer(valid_topic); + + //force synch to ensure the consumer has resulted in a bound queue + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now + + Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST); + + con2.setExceptionListener(this); + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + + MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false); + MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic); + + // First test - should be routed + _logger.info("Sending non-mandatory message"); + TextMessage msg1 = producerSession.createTextMessage("msg1"); + nonMandatoryProducer.send(msg1); + + // Second test - should be bounced + _logger.info("Sending non-routable mandatory message"); + TextMessage msg2 = producerSession.createTextMessage("msg2"); + mandatoryProducer.send(msg2); + + + _logger.info("Starting consumer connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(1000L); + + assertTrue("No message routed to receiver", tm != null); + assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText())); + + try + { + Thread.sleep(1000L); + } + catch (InterruptedException e) + { + ; + } + + assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1); + Message m = _bouncedMessageList.get(0); + assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2")); + + + con.close(); + con2.close(); + } + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class); -- cgit v1.2.1