diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 16:04:16 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 16:04:16 +0000 |
| commit | 6713bfc5ddc1ff6202dad0d950a252273f73f795 (patch) | |
| tree | d789ac52d18fdc493f5d7e1731384c43cbfde9f1 /qpid/java/broker-core | |
| parent | 58c93e3b5e6c2227cc0018720a8781b25ec0d288 (diff) | |
| download | qpid-python-6713bfc5ddc1ff6202dad0d950a252273f73f795.tar.gz | |
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
16 files changed, 76 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 2a688f497a..e01f4b7db9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -423,11 +423,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final List<? extends BaseQueue> route(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - List<? extends BaseQueue> queues = doRoute(message, instanceProperties); + List<? extends BaseQueue> queues = doRoute(message, routingAddress, instanceProperties); List<? extends BaseQueue> allQueues = queues; boolean deletedQueues = false; @@ -464,18 +465,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { - List<? extends BaseQueue> queues = route(message, instanceProperties); + List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); if(queues == null || queues.isEmpty()) { ExchangeImpl altExchange = getAlternateExchange(); if(altExchange != null) { - return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -515,6 +517,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties); @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java index f59049d276..123a4f0a63 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -50,13 +50,31 @@ public class DefaultDestination implements MessageDestination public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { - final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) + { + String[] parts = routingAddress.split("/",2); + ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); + if(exchange != null) + { + return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); + } + } + else if(!routingAddress.contains("/")) + { + ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); + if(exchange != null) + { + return exchange.send(message, "", instanceProperties, txn, postEnqueueAction); + } + } return 0; } else diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 0e8cfb72a1..a67cacf821 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -143,11 +143,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange> } @Override - public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public List<? extends BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey(); - BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); if(bindings != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index c7f81f1d15..b7810e8112 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -79,7 +79,9 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { for(BindingImpl b : getBindings()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 60df38af55..9d3ce0a415 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -93,7 +93,9 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index e7236bdf3e..db73e842b8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -157,12 +157,14 @@ public class TopicExchange extends AbstractExchange<TopicExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingAddress, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey() == null + final String routingKey = routingAddress == null ? "" - : payload.getRoutingKey(); + : routingAddress; final Collection<AMQQueue> matchedQueues = getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); @@ -181,7 +183,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange> if(queues == null || queues.isEmpty()) { - _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); + _logger.info("Message routing key: " + routingAddress + " No routes."); } return queues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 155f209ffb..1913f11ae1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -32,14 +32,18 @@ public interface MessageDestination extends MessageNode /** * Routes a message + * + * * @param message the message to be routed + * @param routingAddress * @param instanceProperties the instance properties * @param txn the transaction to enqueue within * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action<? super MessageInstance> postEnqueueAction); + final String routingAddress, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action<? super MessageInstance> postEnqueueAction); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 28491edaba..8c35af8be4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource { - String getRoutingKey(); + String getInitialRoutingAddress(); AMQMessageHeader getMessageHeader(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 6375cfb07d..fdc2fa90a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -44,6 +44,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, private final Object _messageBody; private final int _contentSize; private InternalMessageHeader _header; + private String _initialRoutingAddress; InternalMessage(final StoredMessage<InternalMessageMetaData> handle, @@ -80,9 +81,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { - return null; + return _initialRoutingAddress; } @Override @@ -253,4 +254,8 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } + public void setInitialRoutingAddress(final String initialRoutingAddress) + { + _initialRoutingAddress = initialRoutingAddress; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 62634970a6..11eb0b8a19 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -70,7 +70,6 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.NotificationListener; import javax.security.auth.Subject; public abstract class AbstractQueue @@ -2465,9 +2464,10 @@ public abstract class AbstractQueue } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { txn.enqueue(this,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 9814431beb..91148b1dc0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -377,6 +377,7 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { enqueues = alternateExchange.send(getMessage(), + getMessage().getInitialRoutingAddress(), getInstanceProperties(), txn, action); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index be0704aeaa..fa75d41810 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -127,7 +127,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, null); - List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + List<? extends BaseQueue> result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -136,7 +136,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -144,14 +144,14 @@ public class FanoutExchangeTest extends TestCase _exchange.deleteBinding("key",queue2); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to queue1 only", 1, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -160,7 +160,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index e4e07813c7..76752de5d0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -73,7 +73,7 @@ public class HeadersExchangeTest extends TestCase protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { - List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY); + List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY); List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 1c191b7b2e..21aa171551 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -324,8 +324,8 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) { ServerMessage message = mock(ServerMessage.class); - when(message.getRoutingKey()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY); + when(message.getInitialRoutingAddress()).thenReturn(routingKey); + List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index bd43100cd2..5622383f3f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -119,7 +119,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 3b74110a6e..8992cf62c9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -67,7 +67,7 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { throw new NotImplementedException(); } |
