diff options
Diffstat (limited to 'qpid/java')
2 files changed, 64 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 9cba1b6977..b842c44a16 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -77,10 +77,46 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; + String routingAddress; + MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader(); + if(_initialRoutingAddress == null) + { + routingAddress = messageHeader.getSubject(); + if(routingAddress == null) + { + if (messageHeader.getHeader("routing-key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing-key"); + } + else if (messageHeader.getHeader("routing_key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing_key"); + } + else if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_exchange.getName() + "/")) + { + routingAddress = messageHeader.getTo().substring(1+_exchange.getName().length()); + } + else + { + routingAddress = ""; + } + } + } + else + { + if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/")) + { + routingAddress = messageHeader.getTo().substring(2+_exchange.getName().length()+_initialRoutingAddress.length()); + } + else + { + routingAddress = _initialRoutingAddress; + } + } int enqueues = _exchange.send(message, - _initialRoutingAddress == null - ? message.getInitialRoutingAddress() - : _initialRoutingAddress, + routingAddress, instanceProperties, txn, null); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index 9f0c1070d3..10058c1e0a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -76,7 +76,31 @@ public class NodeReceivingDestination implements ReceivingDestination return null; }}; - int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + String routingAddress; + MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader(); + routingAddress = messageHeader.getSubject(); + if(routingAddress == null) + { + if (messageHeader.getHeader("routing-key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing-key"); + } + else if (messageHeader.getHeader("routing_key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing_key"); + } + else if (messageHeader.getTo() != null + && messageHeader.getTo().startsWith(_destination.getName() + "/")) + { + routingAddress = messageHeader.getTo().substring(1+_destination.getName().length()); + } + else + { + routingAddress = ""; + } + } + + int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; |
