summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
commit6713bfc5ddc1ff6202dad0d950a252273f73f795 (patch)
treed789ac52d18fdc493f5d7e1731384c43cbfde9f1 /qpid/java/broker-core
parent58c93e3b5e6c2227cc0018720a8781b25ec0d288 (diff)
downloadqpid-python-6713bfc5ddc1ff6202dad0d950a252273f73f795.tar.gz
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java2
16 files changed, 76 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 2a688f497a..e01f4b7db9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -423,11 +423,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final List<? extends BaseQueue> route(final ServerMessage message,
+ final String routingAddress,
final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
- List<? extends BaseQueue> queues = doRoute(message, instanceProperties);
+ List<? extends BaseQueue> queues = doRoute(message, routingAddress, instanceProperties);
List<? extends BaseQueue> allQueues = queues;
boolean deletedQueues = false;
@@ -464,18 +465,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
- List<? extends BaseQueue> queues = route(message, instanceProperties);
+ List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
if(queues == null || queues.isEmpty())
{
ExchangeImpl altExchange = getAlternateExchange();
if(altExchange != null)
{
- return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -515,6 +517,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
+ final String routingAddress,
final InstanceProperties instanceProperties);
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index f59049d276..123a4f0a63 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -50,13 +50,31 @@ public class DefaultDestination implements MessageDestination
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
- final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ final AMQQueue q = _virtualHost.getQueue(routingAddress);
if(q == null)
{
+ if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
+ {
+ String[] parts = routingAddress.split("/",2);
+ ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
+ if(exchange != null)
+ {
+ return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
+ }
+ }
+ else if(!routingAddress.contains("/"))
+ {
+ ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
+ if(exchange != null)
+ {
+ return exchange.send(message, "", instanceProperties, txn, postEnqueueAction);
+ }
+ }
return 0;
}
else
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 0e8cfb72a1..a67cacf821 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -143,11 +143,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
}
@Override
- public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public List<? extends BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
- final String routingKey = payload.getRoutingKey();
-
BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
if(bindings != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index c7f81f1d15..b7810e8112 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -79,7 +79,9 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
for(BindingImpl b : getBindings())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 60df38af55..9d3ce0a415 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -93,7 +93,9 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index e7236bdf3e..db73e842b8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -157,12 +157,14 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingAddress,
+ final InstanceProperties instanceProperties)
{
- final String routingKey = payload.getRoutingKey() == null
+ final String routingKey = routingAddress == null
? ""
- : payload.getRoutingKey();
+ : routingAddress;
final Collection<AMQQueue> matchedQueues =
getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
@@ -181,7 +183,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
if(queues == null || queues.isEmpty())
{
- _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
+ _logger.info("Message routing key: " + routingAddress + " No routes.");
}
return queues;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 155f209ffb..1913f11ae1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -32,14 +32,18 @@ public interface MessageDestination extends MessageNode
/**
* Routes a message
+ *
+ *
* @param message the message to be routed
+ * @param routingAddress
* @param instanceProperties the instance properties
* @param txn the transaction to enqueue within
* @param postEnqueueAction action to perform on the result of every enqueue (may be null)
* @return the number of queues in which the message was enqueued performed
*/
<M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- Action<? super MessageInstance> postEnqueueAction);
+ final String routingAddress,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ Action<? super MessageInstance> postEnqueueAction);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 28491edaba..8c35af8be4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
- String getRoutingKey();
+ String getInitialRoutingAddress();
AMQMessageHeader getMessageHeader();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 6375cfb07d..fdc2fa90a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -44,6 +44,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
private final Object _messageBody;
private final int _contentSize;
private InternalMessageHeader _header;
+ private String _initialRoutingAddress;
InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
@@ -80,9 +81,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
- return null;
+ return _initialRoutingAddress;
}
@Override
@@ -253,4 +254,8 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
+ public void setInitialRoutingAddress(final String initialRoutingAddress)
+ {
+ _initialRoutingAddress = initialRoutingAddress;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 62634970a6..11eb0b8a19 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -70,7 +70,6 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.NotificationListener;
import javax.security.auth.Subject;
public abstract class AbstractQueue
@@ -2465,9 +2464,10 @@ public abstract class AbstractQueue
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 9814431beb..91148b1dc0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -377,6 +377,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if (alternateExchange != null)
{
enqueues = alternateExchange.send(getMessage(),
+ getMessage().getInitialRoutingAddress(),
getInstanceProperties(),
txn,
action);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index be0704aeaa..fa75d41810 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -127,7 +127,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, null);
- List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -136,7 +136,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
- result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -144,14 +144,14 @@ public class FanoutExchangeTest extends TestCase
_exchange.deleteBinding("key",queue2);
- result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
- result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to queue1 only", 1, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -160,7 +160,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
- result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index e4e07813c7..76752de5d0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -73,7 +73,7 @@ public class HeadersExchangeTest extends TestCase
protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
{
- List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
+ List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY);
List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
unexpected.removeAll(Arrays.asList(expected));
assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 1c191b7b2e..21aa171551 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -324,8 +324,8 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber)
{
ServerMessage message = mock(ServerMessage.class);
- when(message.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
+ when(message.getInitialRoutingAddress()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index bd43100cd2..5622383f3f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -119,7 +119,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
return null;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3b74110a6e..8992cf62c9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -67,7 +67,7 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
throw new NotImplementedException();
}