diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
| commit | 3e4d1f2f56ef296ea5132511faaa8689867c499c (patch) | |
| tree | 6c990d7d04cdaf07bd6dace7c157b882f8370cbf /qpid/java/broker/src/test | |
| parent | f6b68fa2e1ca27d46e6080a3568ef5d785eed548 (diff) | |
| download | qpid-python-3e4d1f2f56ef296ea5132511faaa8689867c499c.tar.gz | |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
3 files changed, 293 insertions, 713 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index f4c0fec6c9..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AbstractHeadersExchangeTestBase extends QpidTestCase -{ - private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); - - private final HeadersExchange exchange = new HeadersExchange(); - private final Set<TestQueue> queues = new HashSet<TestQueue>(); - private VirtualHost _virtualHost; - private int count; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - String queueName = "Queue" + (++count); - - return bind(queueName, queueName, getHeadersMap(bindings)); - } - - protected void unbind(TestQueue queue, String... bindings) throws AMQException - { - String queueName = queue.getName(); - exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings))); - } - - protected int getCount() - { - return count; - } - - private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException - { - TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost); - queues.add(queue); - exchange.onBind(new Binding(null, key, queue, exchange, args)); - return queue; - } - - - protected int route(Message m) throws AMQException - { - m.getIncomingMessage().headersReceived(System.currentTimeMillis()); - m.route(exchange); - if(m.getIncomingMessage().allContentReceived()) - { - for(BaseQueue q : m.getIncomingMessage().getDestinationQueues()) - { - q.enqueue(m); - } - } - return m.getIncomingMessage().getDestinationQueues().size(); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException - { - int queueCount = route(m); - - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - - if(expectReturn) - { - assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount); - } - - } - - static Map<String,Object> getHeadersMap(String... entries) - { - if(entries == null) - { - return null; - } - - Map<String,Object> headers = new HashMap<String,Object>(); - - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - - static final class MessagePublishInfoImpl implements MessagePublishInfo - { - private AMQShortString _exchange; - private boolean _immediate; - private boolean _mandatory; - private AMQShortString _routingKey; - - public MessagePublishInfoImpl(AMQShortString routingKey) - { - _routingKey = routingKey; - } - - public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public boolean isImmediate() - { - return _immediate; - - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - - public void setExchange(AMQShortString exchange) - { - _exchange = exchange; - } - - public void setImmediate(boolean immediate) - { - _immediate = immediate; - } - - public void setMandatory(boolean mandatory) - { - _mandatory = mandatory; - } - - public void setRoutingKey(AMQShortString routingKey) - { - _routingKey = routingKey; - } - } - - static MessagePublishInfo getPublishRequest(final String id) - { - return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.setProperties(getProperties(headers)); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends SimpleAMQQueue - { - private final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); - - public String toString() - { - return getNameShortString().toString(); - } - - public TestQueue(AMQShortString name, VirtualHost host) throws AMQException - { - super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP); - host.getQueueRegistry().registerQueue(this); - } - - - - /** - * We override this method so that the default behaviour, which attempts to use a delivery manager, is - * not invoked. It is unnecessary since for this test we only care to know whether the message was - * sent to the queue; the queue processing logic is not being tested. - * @param msg - * @throws AMQException - */ - @Override - public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException - { - messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); - final QueueEntry queueEntry = new QueueEntry() - { - - public AMQQueue getQueue() - { - return null; - } - - public AMQMessage getMessage() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public boolean expired() throws AMQException - { - return false; - } - - public boolean isAvailable() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean delete() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public boolean isAcquiredBy(Subscription subscription) - { - return false; - } - - public void release() - { - - } - - public void setRedelivered() - { - - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public boolean isPersistent() - { - return false; - } - - public boolean isRedelivered() - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public void reject() - { - - } - - public boolean isRejectedBy(long subscriptionId) - { - return false; - } - - public void dequeue() - { - - } - - public void dispose() - { - - } - - public void discard() - { - - } - - public void routeToAlternate() - { - - } - - public boolean isQueueDeleted() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - return false; - } - - public int compareTo(final QueueEntry o) - { - return 0; - } - - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() - { - return false; - } - - public QueueEntry getNextNode() - { - return null; - } - - public QueueEntry getNextValidEntry() - { - return null; - } - - public int getDeliveryCount() - { - return 0; - } - - public void incrementDeliveryCount() - { - } - - public void decrementDeliveryCount() - { - } - }; - - if(action != null) - { - action.onEnqueue(queueEntry); - } - - } - - boolean isInQueue(Message msg) - { - return messages.contains(msg); - } - - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static AtomicLong _messageId = new AtomicLong(); - - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final AMQProtocolSession publisher) - { - super(info); - } - - - public ContentHeaderBody getContentHeader() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; - - - Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException - { - this(protocolSession, id, getHeaders(headers)); - } - - Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException - { - this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); - } - - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(AMQProtocolSession protocolsession, long messageId, - MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(new MockStoredMessage(messageId, publish, header)); - - StoredMessage<MessageMetaData> storedMessage = getStoredMessage(); - - int pos = 0; - for(ContentBody body : bodies) - { - storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload())); - pos += body.getPayload().length; - } - - _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); - _incoming.setContentHeaderBody(header); - - - } - - - private Message(AMQMessage msg) throws AMQException - { - super(msg.getStoredMessage()); - } - - - - void route(Exchange exchange) throws AMQException - { - _incoming.enqueue(exchange.route(_incoming)); - } - - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 2ddb417d5d..7b7e2ec346 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -21,22 +21,32 @@ package org.apache.qpid.server.exchange; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class FanoutExchangeTest extends TestCase { @@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); } @@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, (FieldTable) null, queue)); + _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue)); } public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, queue)); + _exchange.isBound(new AMQShortString("matters"), queue)); } public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException @@ -95,9 +107,86 @@ public class FanoutExchangeTest extends TestCase private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException { + AMQQueue queue = mockQueue(); + _exchange.addBinding("matters", queue, null); + return queue; + } + + private AMQQueue mockQueue() + { AMQQueue queue = mock(AMQQueue.class); when(queue.getVirtualHost()).thenReturn(_virtualHost); - _exchange.addBinding("does not matter", queue, null); return queue; } + + public void testRoutingWithSelectors() throws Exception + { + AMQQueue queue1 = mockQueue(); + AMQQueue queue2 = mockQueue(); + + _exchange.addBinding("key",queue1, null); + _exchange.addBinding("key",queue2, null); + + + List<? extends BaseQueue> result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); + + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.removeBinding("key",queue2,null); + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + result = _exchange.route(mockMessage(false)); + + assertEquals("Expected message to be routed to queue1 only", 1, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertFalse("Expected queue2 not to be routed to", result.contains(queue2)); + + _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); + + + result = _exchange.route(mockMessage(false)); + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + } + + private InboundMessage mockMessage(boolean val) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader("select")).thenReturn(true); + when(header.getHeader("select")).thenReturn(val); + when(header.getHeaderNames()).thenReturn(Collections.singleton("select")); + when(header.containsHeaders(anySet())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return names.size() == 1 && names.contains("select"); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index bd6a02d69b..2b965358e0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -20,106 +20,230 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import junit.framework.TestCase; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HeadersExchangeTest extends TestCase { - private AMQProtocolSession _protocolSession; + private HeadersExchange _exchange; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock()); + + CurrentActor.setDefault(mock(LogActor.class)); + _exchange = new HeadersExchange(); + _virtualHost = mock(VirtualHost.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + } - @Override - public void tearDown() throws Exception + protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception { - BrokerTestHelper.tearDown(); - super.tearDown(); + List<? extends BaseQueue> results = _exchange.route(msg); + List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); + unexpected.removeAll(Arrays.asList(expected)); + assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); + List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected)); + missing.removeAll(results); + assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty()); + assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size()); } - public void testSimple() throws AMQException + + private AMQQueue createAndBind(final String name, String... arguments) + throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); - - Message m7 = new Message(_protocolSession, "Message7", "XXXXX"); - - MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); - pb7.setMandatory(true); - routeAndTest(m7,true); - - Message m8 = new Message(_protocolSession, "Message8", "F0000"); - MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); - pb8.setMandatory(true); - routeAndTest(m8,false,q1); + return createAndBind(name, getArgsMapFromStrings(arguments)); + } + + private Map<String, Object> getArgsMapFromStrings(String... arguments) + { + Map<String, Object> map = new HashMap<String,Object>(); + + for(String arg : arguments) + { + if(arg.contains("=")) + { + String[] keyValue = arg.split("=",2); + map.put(keyValue[0],keyValue[1]); + } + else + { + map.put(arg,null); + } + } + return map; + } + private AMQQueue createAndBind(final String name, Map<String, Object> arguments) + throws Exception + { + AMQQueue q = create(name); + bind(name, arguments, q); + return q; + } + private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q) + throws AMQSecurityException, AMQInternalException + { + _exchange.addBinding(bindingKey,q,arguments); } - public void testAny() throws AMQException + private AMQQueue create(String name) { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); + AMQQueue q = mock(AMQQueue.class); + when(q.toString()).thenReturn(name); + when(q.getVirtualHost()).thenReturn(_virtualHost); + return q; } - public void testMandatory() throws AMQException + + public void testSimple() throws Exception { - bindDefault("F0000"); - Message m1 = new Message(_protocolSession, "Message1", "XXXXX"); - Message m2 = new Message(_protocolSession, "Message2", "F0000"); - MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); - pb1.setMandatory(true); - MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); - pb2.setMandatory(true); - routeAndTest(m1,true); + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + AMQQueue q4 = createAndBind("Q4", "F0001=Bear"); + AMQQueue q5 = createAndBind("Q5", "F0000", "F0001"); + AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear"); + AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear"); + AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } - - public void testOnUnbind() throws AMQException + + public void testAny() throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3); - - unbind(q1,"F0000"); - routeAndTest(new Message(_protocolSession, "Message4", "F0000")); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2); + AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any"); + AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any"); + AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any"); + AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } + + public void testOnUnbind() throws Exception + { + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3); + + _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000")); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000"))); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2); + } + + + public void testWithSelectors() throws Exception + { + AMQQueue q1 = create("Q1"); + AMQQueue q2 = create("Q2"); + bind("q1",getArgsMapFromStrings("F"), q1); + bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1); + bind("q2",getArgsMapFromStrings("F=1"), q2); + + routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2); + + + AMQQueue q3 = create("Q3"); + bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1); + bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3); + + } + + private InboundMessage mockMessage(final Map<String, Object> headerValues) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader(anyString())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.containsKey((String) invocation.getArguments()[0]); + } + }); + when(header.getHeader(anyString())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.get((String) invocation.getArguments()[0]); + } + }); + when(header.getHeaderNames()).thenReturn(headerValues.keySet()); + when(header.containsHeaders(anySet())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return headerValues.keySet().containsAll(names); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; } - public static junit.framework.Test suite() { |
