diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-23 15:54:15 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-23 15:54:15 +0000 |
| commit | e70ee3e8bff300dff233829741d2ddd2cb89b117 (patch) | |
| tree | afda2db38f34b367a94d328527edd608f4755cba /java | |
| parent | 8de38d68d7efe3b286da38c996cd8eabbe67ddc8 (diff) | |
| download | qpid-python-e70ee3e8bff300dff233829741d2ddd2cb89b117.tar.gz | |
QPID-436 - topic exchange doesn't obey the mandatory flag
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@531513 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
2 files changed, 162 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 8a50e93bf9..605a4bcb61 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -57,19 +57,16 @@ public class DestWildExchange extends AbstractExchange private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - /** - * DestWildExchangeMBean class implements the management interface for the - * Topic exchanges. - */ + /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") private final class DestWildExchangeMBean extends ExchangeMBean { // open mbean data types for representing exchange bindings - private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; - private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; private OpenType[] _bindingItemTypes = new OpenType[2]; - private CompositeType _bindingDataType = null; - private TabularType _bindinglistDataType = null; + private CompositeType _bindingDataType = null; + private TabularType _bindinglistDataType = null; private TabularDataSupport _bindingList = null; @MBeanConstructor("Creates an MBean for AMQ topic exchange") @@ -80,22 +77,18 @@ public class DestWildExchange extends AbstractExchange init(); } - /** - * initialises the OpenType objects. - */ + /** initialises the OpenType objects. */ private void init() throws OpenDataException { _bindingItemTypes[0] = SimpleType.STRING; _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", - _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindingItemNames, _bindingItemNames, _bindingItemTypes); _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), - _bindingDataType, _bindingItemIndexNames); + _bindingDataType, _bindingItemIndexNames); } - /** - * returns exchange bindings in tabular form - */ + /** returns exchange bindings in tabular form */ public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); @@ -122,7 +115,9 @@ public class DestWildExchange extends AbstractExchange { AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -159,7 +154,7 @@ public class DestWildExchange extends AbstractExchange { queueList.add(queue); } - else if(_logger.isDebugEnabled()) + else if (_logger.isDebugEnabled()) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -176,10 +171,18 @@ public class DestWildExchange extends AbstractExchange // TODO: add support for the immediate flag if (queues == null) { - _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); - //todo Check for valid topic - mritchie - return; + if (info.isMandatory()) + { + String msg = "Topic " + routingKey + " is not known to " + this; + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn("No queues found for routing key " + routingKey); + _logger.warn("Routing map contains: " + _routingKey2queues); + //todo Check for valid topic - mritchie + return; + } } for (AMQQueue q : queues) @@ -245,7 +248,7 @@ public class DestWildExchange extends AbstractExchange } } - protected ExchangeMBean createMBean() throws AMQException + protected ExchangeMBean createMBean() throws AMQException { try { diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 87491ed3d3..ca352b2fd7 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -22,6 +22,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+ private static final String VIRTUALHOST = "test";
+ private static final String BROKER = "vm://:1";
static
{
@@ -53,10 +55,10 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex *
* @throws Exception
*/
- public void testReturnUnroutableMandatoryMessage() throws Exception
+ public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception
{
_bouncedMessageList.clear();
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -70,7 +72,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
// This is the default now
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
con2.setExceptionListener(this);
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -127,6 +129,138 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex }
+ public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQQueue valid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE");
+ AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE_INVALID");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_queue);
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
+
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue);
+
+ // First test - should be routed
+ _logger.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch (InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+ con.close();
+ con2.close();
+ }
+
+
+ public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC");
+ AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_topic);
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
+
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic);
+
+ // First test - should be routed
+ _logger.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch (InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+ con.close();
+ con2.close();
+ }
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
|
