diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-30 10:36:56 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-30 10:36:56 +0000 |
| commit | 3eef81f1fd50e950c2230b05d973ae7a7e2091e8 (patch) | |
| tree | bdcddfcdb012748b0a9fc976b3bfd84052d1b70f /qpid/java/broker-plugins | |
| parent | d6cff4d44a2530691e434fdbd81fb4721fa3b16e (diff) | |
| download | qpid-python-3eef81f1fd50e950c2230b05d973ae7a7e2091e8.tar.gz | |
QPID-6061 : [Java Broker] Use subject as the proxy for routing key for 1.0 messages sent directly to an exchange of type direct or topic
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
2 files changed, 64 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 9cba1b6977..b842c44a16 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -77,10 +77,46 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; + String routingAddress; + MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader(); + if(_initialRoutingAddress == null) + { + routingAddress = messageHeader.getSubject(); + if(routingAddress == null) + { + if (messageHeader.getHeader("routing-key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing-key"); + } + else if (messageHeader.getHeader("routing_key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing_key"); + } + else if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_exchange.getName() + "/")) + { + routingAddress = messageHeader.getTo().substring(1+_exchange.getName().length()); + } + else + { + routingAddress = ""; + } + } + } + else + { + if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/")) + { + routingAddress = messageHeader.getTo().substring(2+_exchange.getName().length()+_initialRoutingAddress.length()); + } + else + { + routingAddress = _initialRoutingAddress; + } + } int enqueues = _exchange.send(message, - _initialRoutingAddress == null - ? message.getInitialRoutingAddress() - : _initialRoutingAddress, + routingAddress, instanceProperties, txn, null); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index 9f0c1070d3..10058c1e0a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -76,7 +76,31 @@ public class NodeReceivingDestination implements ReceivingDestination return null; }}; - int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + String routingAddress; + MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader(); + routingAddress = messageHeader.getSubject(); + if(routingAddress == null) + { + if (messageHeader.getHeader("routing-key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing-key"); + } + else if (messageHeader.getHeader("routing_key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing_key"); + } + else if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_destination.getName() + "/")) + { + routingAddress = messageHeader.getTo().substring(1+_destination.getName().length()); + } + else + { + routingAddress = ""; + } + } + + int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; |
