summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
commit3e4d1f2f56ef296ea5132511faaa8689867c499c (patch)
tree6c990d7d04cdaf07bd6dace7c157b882f8370cbf /qpid/java/broker/src/test
parentf6b68fa2e1ca27d46e6080a3568ef5d785eed548 (diff)
downloadqpid-python-3e4d1f2f56ef296ea5132511faaa8689867c499c.tar.gz
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java633
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java97
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java276
3 files changed, 293 insertions, 713 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
deleted file mode 100644
index f4c0fec6c9..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MockStoredMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AbstractHeadersExchangeTestBase extends QpidTestCase
-{
- private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
-
- private final HeadersExchange exchange = new HeadersExchange();
- private final Set<TestQueue> queues = new HashSet<TestQueue>();
- private VirtualHost _virtualHost;
- private int count;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testDoNothing()
- {
- // this is here only to make junit under Eclipse happy
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- protected TestQueue bindDefault(String... bindings) throws AMQException
- {
- String queueName = "Queue" + (++count);
-
- return bind(queueName, queueName, getHeadersMap(bindings));
- }
-
- protected void unbind(TestQueue queue, String... bindings) throws AMQException
- {
- String queueName = queue.getName();
- exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings)));
- }
-
- protected int getCount()
- {
- return count;
- }
-
- private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException
- {
- TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost);
- queues.add(queue);
- exchange.onBind(new Binding(null, key, queue, exchange, args));
- return queue;
- }
-
-
- protected int route(Message m) throws AMQException
- {
- m.getIncomingMessage().headersReceived(System.currentTimeMillis());
- m.route(exchange);
- if(m.getIncomingMessage().allContentReceived())
- {
- for(BaseQueue q : m.getIncomingMessage().getDestinationQueues())
- {
- q.enqueue(m);
- }
- }
- return m.getIncomingMessage().getDestinationQueues().size();
- }
-
- protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, false, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, expectReturn, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
- {
- int queueCount = route(m);
-
- for (TestQueue q : queues)
- {
- if (expected.contains(q))
- {
- assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
- }
- }
-
- if(expectReturn)
- {
- assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
- }
-
- }
-
- static Map<String,Object> getHeadersMap(String... entries)
- {
- if(entries == null)
- {
- return null;
- }
-
- Map<String,Object> headers = new HashMap<String,Object>();
-
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.put(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
- static FieldTable getHeaders(String... entries)
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.setObject(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
-
- static final class MessagePublishInfoImpl implements MessagePublishInfo
- {
- private AMQShortString _exchange;
- private boolean _immediate;
- private boolean _mandatory;
- private AMQShortString _routingKey;
-
- public MessagePublishInfoImpl(AMQShortString routingKey)
- {
- _routingKey = routingKey;
- }
-
- public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
- {
- _exchange = exchange;
- _immediate = immediate;
- _mandatory = mandatory;
- _routingKey = routingKey;
- }
-
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public boolean isImmediate()
- {
- return _immediate;
-
- }
-
- public boolean isMandatory()
- {
- return _mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
-
- public void setExchange(AMQShortString exchange)
- {
- _exchange = exchange;
- }
-
- public void setImmediate(boolean immediate)
- {
- _immediate = immediate;
- }
-
- public void setMandatory(boolean mandatory)
- {
- _mandatory = mandatory;
- }
-
- public void setRoutingKey(AMQShortString routingKey)
- {
- _routingKey = routingKey;
- }
- }
-
- static MessagePublishInfo getPublishRequest(final String id)
- {
- return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id));
- }
-
- static ContentHeaderBody getContentHeader(FieldTable headers)
- {
- ContentHeaderBody header = new ContentHeaderBody();
- header.setProperties(getProperties(headers));
- return header;
- }
-
- static BasicContentHeaderProperties getProperties(FieldTable headers)
- {
- BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
- properties.setHeaders(headers);
- return properties;
- }
-
- static class TestQueue extends SimpleAMQQueue
- {
- private final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
-
- public String toString()
- {
- return getNameShortString().toString();
- }
-
- public TestQueue(AMQShortString name, VirtualHost host) throws AMQException
- {
- super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP);
- host.getQueueRegistry().registerQueue(this);
- }
-
-
-
- /**
- * We override this method so that the default behaviour, which attempts to use a delivery manager, is
- * not invoked. It is unnecessary since for this test we only care to know whether the message was
- * sent to the queue; the queue processing logic is not being tested.
- * @param msg
- * @throws AMQException
- */
- @Override
- public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException
- {
- messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
- final QueueEntry queueEntry = new QueueEntry()
- {
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public AMQMessage getMessage()
- {
- return null;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return false;
- }
-
- public boolean expired() throws AMQException
- {
- return false;
- }
-
- public boolean isAvailable()
- {
- return false;
- }
-
- public boolean isAcquired()
- {
- return false;
- }
-
- public boolean acquire()
- {
- return false;
- }
-
- public boolean acquire(Subscription sub)
- {
- return false;
- }
-
- public boolean delete()
- {
- return false;
- }
-
- public boolean isDeleted()
- {
- return false;
- }
-
- public boolean acquiredBySubscription()
- {
- return false;
- }
-
- public boolean isAcquiredBy(Subscription subscription)
- {
- return false;
- }
-
- public void release()
- {
-
- }
-
- public void setRedelivered()
- {
-
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return null;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- public boolean isRedelivered()
- {
- return false;
- }
-
- public Subscription getDeliveredSubscription()
- {
- return null;
- }
-
- public void reject()
- {
-
- }
-
- public boolean isRejectedBy(long subscriptionId)
- {
- return false;
- }
-
- public void dequeue()
- {
-
- }
-
- public void dispose()
- {
-
- }
-
- public void discard()
- {
-
- }
-
- public void routeToAlternate()
- {
-
- }
-
- public boolean isQueueDeleted()
- {
- return false;
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
-
- }
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
- return false;
- }
-
- public int compareTo(final QueueEntry o)
- {
- return 0;
- }
-
- public boolean isDequeued()
- {
- return false;
- }
-
- public boolean isDispensed()
- {
- return false;
- }
-
- public QueueEntry getNextNode()
- {
- return null;
- }
-
- public QueueEntry getNextValidEntry()
- {
- return null;
- }
-
- public int getDeliveryCount()
- {
- return 0;
- }
-
- public void incrementDeliveryCount()
- {
- }
-
- public void decrementDeliveryCount()
- {
- }
- };
-
- if(action != null)
- {
- action.onEnqueue(queueEntry);
- }
-
- }
-
- boolean isInQueue(Message msg)
- {
- return messages.contains(msg);
- }
-
- }
-
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends AMQMessage
- {
- private static AtomicLong _messageId = new AtomicLong();
-
- private class TestIncomingMessage extends IncomingMessage
- {
-
- public TestIncomingMessage(final long messageId,
- final MessagePublishInfo info,
- final AMQProtocolSession publisher)
- {
- super(info);
- }
-
-
- public ContentHeaderBody getContentHeader()
- {
- return Message.this.getContentHeaderBody();
- }
- }
-
- private IncomingMessage _incoming;
-
-
- Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
- {
- this(protocolSession, id, getHeaders(headers));
- }
-
- Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
- {
- this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
- }
-
- public IncomingMessage getIncomingMessage()
- {
- return _incoming;
- }
-
- private Message(AMQProtocolSession protocolsession, long messageId,
- MessagePublishInfo publish,
- ContentHeaderBody header,
- List<ContentBody> bodies) throws AMQException
- {
- super(new MockStoredMessage(messageId, publish, header));
-
- StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
-
- int pos = 0;
- for(ContentBody body : bodies)
- {
- storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload()));
- pos += body.getPayload().length;
- }
-
- _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
- _incoming.setContentHeaderBody(header);
-
-
- }
-
-
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg.getStoredMessage());
- }
-
-
-
- void route(Exchange exchange) throws AMQException
- {
- _incoming.enqueue(exchange.route(_incoming));
- }
-
-
- public int hashCode()
- {
- return getKey().hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
- }
-
- private boolean equals(HeadersExchangeTest.Message m)
- {
- return getKey().equals(m.getKey());
- }
-
- public String toString()
- {
- return getKey().toString();
- }
-
- private Object getKey()
- {
- try
- {
- return getMessagePublishInfo().getRoutingKey();
- }
- catch (AMQException e)
- {
- _log.error("Error getting routing key: " + e, e);
- return null;
- }
- }
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 2ddb417d5d..7b7e2ec346 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -21,22 +21,32 @@
package org.apache.qpid.server.exchange;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class FanoutExchangeTest extends TestCase
{
@@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
_exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
}
@@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, (FieldTable) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue));
}
public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), queue));
}
public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
@@ -95,9 +107,86 @@ public class FanoutExchangeTest extends TestCase
private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
{
+ AMQQueue queue = mockQueue();
+ _exchange.addBinding("matters", queue, null);
+ return queue;
+ }
+
+ private AMQQueue mockQueue()
+ {
AMQQueue queue = mock(AMQQueue.class);
when(queue.getVirtualHost()).thenReturn(_virtualHost);
- _exchange.addBinding("does not matter", queue, null);
return queue;
}
+
+ public void testRoutingWithSelectors() throws Exception
+ {
+ AMQQueue queue1 = mockQueue();
+ AMQQueue queue2 = mockQueue();
+
+ _exchange.addBinding("key",queue1, null);
+ _exchange.addBinding("key",queue2, null);
+
+
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.removeBinding("key",queue2,null);
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ result = _exchange.route(mockMessage(false));
+
+ assertEquals("Expected message to be routed to queue1 only", 1, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+
+
+ result = _exchange.route(mockMessage(false));
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ }
+
+ private InboundMessage mockMessage(boolean val)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader("select")).thenReturn(true);
+ when(header.getHeader("select")).thenReturn(val);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeaders(anySet())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return names.size() == 1 && names.contains("select");
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index bd6a02d69b..2b965358e0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -20,106 +20,230 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import junit.framework.TestCase;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class HeadersExchangeTest extends TestCase
{
- private AMQProtocolSession _protocolSession;
+ private HeadersExchange _exchange;
+ private VirtualHost _virtualHost;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock());
+
+ CurrentActor.setDefault(mock(LogActor.class));
+ _exchange = new HeadersExchange();
+ _virtualHost = mock(VirtualHost.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+
}
- @Override
- public void tearDown() throws Exception
+ protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
{
- BrokerTestHelper.tearDown();
- super.tearDown();
+ List<? extends BaseQueue> results = _exchange.route(msg);
+ List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+ unexpected.removeAll(Arrays.asList(expected));
+ assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
+ List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+ missing.removeAll(results);
+ assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
+ assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
}
- public void testSimple() throws AMQException
+
+ private AMQQueue createAndBind(final String name, String... arguments)
+ throws Exception
{
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
- TestQueue q4 = bindDefault("F0001=Bear");
- TestQueue q5 = bindDefault("F0000", "F0001");
- TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
- TestQueue q7 = bindDefault("F0000", "F0001=Bear");
- TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"),
- q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
-
- Message m7 = new Message(_protocolSession, "Message7", "XXXXX");
-
- MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
- pb7.setMandatory(true);
- routeAndTest(m7,true);
-
- Message m8 = new Message(_protocolSession, "Message8", "F0000");
- MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
- pb8.setMandatory(true);
- routeAndTest(m8,false,q1);
+ return createAndBind(name, getArgsMapFromStrings(arguments));
+ }
+
+ private Map<String, Object> getArgsMapFromStrings(String... arguments)
+ {
+ Map<String, Object> map = new HashMap<String,Object>();
+
+ for(String arg : arguments)
+ {
+ if(arg.contains("="))
+ {
+ String[] keyValue = arg.split("=",2);
+ map.put(keyValue[0],keyValue[1]);
+ }
+ else
+ {
+ map.put(arg,null);
+ }
+ }
+ return map;
+ }
+ private AMQQueue createAndBind(final String name, Map<String, Object> arguments)
+ throws Exception
+ {
+ AMQQueue q = create(name);
+ bind(name, arguments, q);
+ return q;
+ }
+ private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q)
+ throws AMQSecurityException, AMQInternalException
+ {
+ _exchange.addBinding(bindingKey,q,arguments);
}
- public void testAny() throws AMQException
+ private AMQQueue create(String name)
{
- TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
- TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
- TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
- TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
- TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
+ AMQQueue q = mock(AMQQueue.class);
+ when(q.toString()).thenReturn(name);
+ when(q.getVirtualHost()).thenReturn(_virtualHost);
+ return q;
}
- public void testMandatory() throws AMQException
+
+ public void testSimple() throws Exception
{
- bindDefault("F0000");
- Message m1 = new Message(_protocolSession, "Message1", "XXXXX");
- Message m2 = new Message(_protocolSession, "Message2", "F0000");
- MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
- pb1.setMandatory(true);
- MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
- pb2.setMandatory(true);
- routeAndTest(m1,true);
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+ AMQQueue q4 = createAndBind("Q4", "F0001=Bear");
+ AMQQueue q5 = createAndBind("Q5", "F0000", "F0001");
+ AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear");
+ AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear");
+ AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+
}
-
- public void testOnUnbind() throws AMQException
+
+ public void testAny() throws Exception
{
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
-
- unbind(q1,"F0000");
- routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2);
+ AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any");
+ AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any");
+ AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
+ AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+ }
+
+ public void testOnUnbind() throws Exception
+ {
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+
+ _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000"));
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+ }
+
+
+ public void testWithSelectors() throws Exception
+ {
+ AMQQueue q1 = create("Q1");
+ AMQQueue q2 = create("Q2");
+ bind("q1",getArgsMapFromStrings("F"), q1);
+ bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
+ bind("q2",getArgsMapFromStrings("F=1"), q2);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+
+
+ AMQQueue q3 = create("Q3");
+ bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
+ bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+
+ }
+
+ private InboundMessage mockMessage(final Map<String, Object> headerValues)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader(anyString())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.containsKey((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeader(anyString())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.get((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+ when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return headerValues.keySet().containsAll(names);
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
}
-
public static junit.framework.Test suite()
{