From b419785066a84a18b45ccf88c09cf36cda01f8e2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 5 Mar 2014 17:33:23 +0000 Subject: QPID-5605 : [Java Broker] [AMQP 1.0] allow use of addresses of the form / git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574582 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v1_0/ExchangeDestination.java | 19 ++++- .../protocol/v1_0/NodeReceivingDestination.java | 10 +-- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 12 +++- .../qpid/server/protocol/v1_0/Session_1_0.java | 83 +++++++++++++++++----- 4 files changed, 96 insertions(+), 28 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index fc2c0d93d0..9cba1b6977 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -38,6 +38,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina private ExchangeImpl _exchange; private TerminusDurability _durability; private TerminusExpiryPolicy _expiryPolicy; + private String _initialRoutingAddress; public ExchangeDestination(ExchangeImpl exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) { @@ -76,7 +77,13 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + int enqueues = _exchange.send(message, + _initialRoutingAddress == null + ? message.getInitialRoutingAddress() + : _initialRoutingAddress, + instanceProperties, + txn, + null); return enqueues == 0 ? REJECTED : ACCEPTED; @@ -102,4 +109,14 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina { return _exchange; } + + public void setInitialRoutingAddress(final String initialRoutingAddress) + { + _initialRoutingAddress = initialRoutingAddress; + } + + public String getInitialRoutingAddress() + { + return _initialRoutingAddress; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index dedb8a3dc0..9f0c1070d3 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -35,13 +35,13 @@ public class NodeReceivingDestination implements ReceivingDestination public static final Rejected REJECTED = new Rejected(); private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; - private MessageDestination _exchange; + private MessageDestination _destination; private TerminusDurability _durability; private TerminusExpiryPolicy _expiryPolicy; - public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) + public NodeReceivingDestination(MessageDestination destination, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) { - _exchange = exchange; + _destination = destination; _durability = durable; _expiryPolicy = expiryPolicy; } @@ -76,7 +76,7 @@ public class NodeReceivingDestination implements ReceivingDestination return null; }}; - int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); + int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; @@ -100,6 +100,6 @@ public class NodeReceivingDestination implements ReceivingDestination public MessageDestination getDestination() { - return _exchange; + return _destination; } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 394ab69990..24395a6fad 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -230,7 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } - String binding = ""; + String binding = null; Map filters = source.getFilter(); Map actualFilters = new HashMap(); @@ -298,8 +298,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - - exchange.addBinding(binding, queue,null); + if(binding != null) + { + exchange.addBinding(binding, queue,null); + } + if(exchangeDestination.getInitialRoutingAddress() != null) + { + exchange.addBinding(exchangeDestination.getInitialRoutingAddress(),queue,null); + } source.setDistributionMode(StdDistMode.COPY); qd = new QueueDestination(queue); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 78dcab9d75..411117be4d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -127,17 +127,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel