summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-30 10:36:56 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-30 10:36:56 +0000
commit3eef81f1fd50e950c2230b05d973ae7a7e2091e8 (patch)
treebdcddfcdb012748b0a9fc976b3bfd84052d1b70f /qpid/java/broker-plugins
parentd6cff4d44a2530691e434fdbd81fb4721fa3b16e (diff)
downloadqpid-python-3eef81f1fd50e950c2230b05d973ae7a7e2091e8.tar.gz
QPID-6061 : [Java Broker] Use subject as the proxy for routing key for 1.0 messages sent directly to an exchange of type direct or topic
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java42
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java26
2 files changed, 64 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 9cba1b6977..b842c44a16 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -77,10 +77,46 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
+ String routingAddress;
+ MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
+ if(_initialRoutingAddress == null)
+ {
+ routingAddress = messageHeader.getSubject();
+ if(routingAddress == null)
+ {
+ if (messageHeader.getHeader("routing-key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing-key");
+ }
+ else if (messageHeader.getHeader("routing_key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing_key");
+ }
+ else if (messageHeader.getTo() != null
+ && messageHeader.getTo().startsWith(_exchange.getName() + "/"))
+ {
+ routingAddress = messageHeader.getTo().substring(1+_exchange.getName().length());
+ }
+ else
+ {
+ routingAddress = "";
+ }
+ }
+ }
+ else
+ {
+ if (messageHeader.getTo() != null
+ && messageHeader.getTo().startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/"))
+ {
+ routingAddress = messageHeader.getTo().substring(2+_exchange.getName().length()+_initialRoutingAddress.length());
+ }
+ else
+ {
+ routingAddress = _initialRoutingAddress;
+ }
+ }
int enqueues = _exchange.send(message,
- _initialRoutingAddress == null
- ? message.getInitialRoutingAddress()
- : _initialRoutingAddress,
+ routingAddress,
instanceProperties,
txn,
null);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 9f0c1070d3..10058c1e0a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -76,7 +76,31 @@ public class NodeReceivingDestination implements ReceivingDestination
return null;
}};
- int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
+ String routingAddress;
+ MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
+ routingAddress = messageHeader.getSubject();
+ if(routingAddress == null)
+ {
+ if (messageHeader.getHeader("routing-key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing-key");
+ }
+ else if (messageHeader.getHeader("routing_key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing_key");
+ }
+ else if (messageHeader.getTo() != null
+ && messageHeader.getTo().startsWith(_destination.getName() + "/"))
+ {
+ routingAddress = messageHeader.getTo().substring(1+_destination.getName().length());
+ }
+ else
+ {
+ routingAddress = "";
+ }
+ }
+
+ int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null);
return enqueues == 0 ? REJECTED : ACCEPTED;