diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 17:33:23 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 17:33:23 +0000 |
commit | b419785066a84a18b45ccf88c09cf36cda01f8e2 (patch) | |
tree | 2e68cdcf60ebb6fa76356c22136beb04c53fc732 | |
parent | 3b47a636912b9981b4e75f751337fc60f52201f4 (diff) | |
download | qpid-python-b419785066a84a18b45ccf88c09cf36cda01f8e2.tar.gz |
QPID-5605 : [Java Broker] [AMQP 1.0] allow use of addresses of the form <exchange>/<routing-key>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574582 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 96 insertions, 28 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index fc2c0d93d0..9cba1b6977 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -38,6 +38,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina private ExchangeImpl _exchange; private TerminusDurability _durability; private TerminusExpiryPolicy _expiryPolicy; + private String _initialRoutingAddress; public ExchangeDestination(ExchangeImpl exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) { @@ -76,7 +77,13 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + int enqueues = _exchange.send(message, + _initialRoutingAddress == null + ? message.getInitialRoutingAddress() + : _initialRoutingAddress, + instanceProperties, + txn, + null); return enqueues == 0 ? REJECTED : ACCEPTED; @@ -102,4 +109,14 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina { return _exchange; } + + public void setInitialRoutingAddress(final String initialRoutingAddress) + { + _initialRoutingAddress = initialRoutingAddress; + } + + public String getInitialRoutingAddress() + { + return _initialRoutingAddress; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index dedb8a3dc0..9f0c1070d3 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -35,13 +35,13 @@ public class NodeReceivingDestination implements ReceivingDestination public static final Rejected REJECTED = new Rejected(); private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; - private MessageDestination _exchange; + private MessageDestination _destination; private TerminusDurability _durability; private TerminusExpiryPolicy _expiryPolicy; - public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) + public NodeReceivingDestination(MessageDestination destination, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) { - _exchange = exchange; + _destination = destination; _durability = durable; _expiryPolicy = expiryPolicy; } @@ -76,7 +76,7 @@ public class NodeReceivingDestination implements ReceivingDestination return null; }}; - int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; @@ -100,6 +100,6 @@ public class NodeReceivingDestination implements ReceivingDestination public MessageDestination getDestination() { - return _exchange; + return _destination; } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 394ab69990..24395a6fad 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -230,7 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } - String binding = ""; + String binding = null; Map<Symbol,Filter> filters = source.getFilter(); Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>(); @@ -298,8 +298,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - - exchange.addBinding(binding, queue,null); + if(binding != null) + { + exchange.addBinding(binding, queue,null); + } + if(exchangeDestination.getInitialRoutingAddress() != null) + { + exchange.addBinding(exchangeDestination.getInitialRoutingAddress(),queue,null); + } source.setDistributionMode(StdDistMode.COPY); qd = new QueueDestination(queue); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 78dcab9d75..411117be4d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -127,17 +127,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - MessageSource queue = getVirtualHost().getMessageSource(addr); - if(queue != null) + if(!addr.startsWith("/") && addr.contains("/")) { - destination = new MessageSourceDestination(queue); - } - else - { - ExchangeImpl exchg = getVirtualHost().getExchange(addr); + String[] parts = addr.split("/",2); + ExchangeImpl exchg = getVirtualHost().getExchange(parts[0]); if(exchg != null) { - destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); + ExchangeDestination exchangeDestination = + new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); + exchangeDestination.setInitialRoutingAddress(parts[1]); + destination = exchangeDestination; + } else { @@ -145,6 +145,27 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio destination = null; } } + else + { + MessageSource queue = getVirtualHost().getMessageSource(addr); + if(queue != null) + { + destination = new MessageSourceDestination(queue); + } + else + { + ExchangeImpl exchg = getVirtualHost().getExchange(addr); + if(exchg != null) + { + destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); + } + else + { + endpoint.setSource(null); + destination = null; + } + } + } } else @@ -265,28 +286,52 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } String addr = target.getAddress(); - MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr); - if(messageDestination != null) - { - destination = new NodeReceivingDestination(messageDestination, target.getDurable(), - target.getExpiryPolicy()); - } - else + if(!addr.startsWith("/") && addr.contains("/")) { - AMQQueue queue = getVirtualHost().getQueue(addr); - if(queue != null) + String[] parts = addr.split("/",2); + ExchangeImpl exchange = getVirtualHost().getExchange(parts[0]); + if(exchange != null) { + ExchangeDestination exchangeDestination = + new ExchangeDestination(exchange, + target.getDurable(), + target.getExpiryPolicy()); + + exchangeDestination.setInitialRoutingAddress(parts[1]); + + destination = exchangeDestination; - destination = new QueueDestination(queue); } else { endpoint.setTarget(null); destination = null; } - } + else + { + MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr); + if(messageDestination != null) + { + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy()); + } + else + { + AMQQueue queue = getVirtualHost().getQueue(addr); + if(queue != null) + { + + destination = new QueueDestination(queue); + } + else + { + endpoint.setTarget(null); + destination = null; + } + } + } } else |