summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-05 17:33:23 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-05 17:33:23 +0000
commitb419785066a84a18b45ccf88c09cf36cda01f8e2 (patch)
tree2e68cdcf60ebb6fa76356c22136beb04c53fc732
parent3b47a636912b9981b4e75f751337fc60f52201f4 (diff)
downloadqpid-python-b419785066a84a18b45ccf88c09cf36cda01f8e2.tar.gz
QPID-5605 : [Java Broker] [AMQP 1.0] allow use of addresses of the form <exchange>/<routing-key>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574582 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java19
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java12
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java83
4 files changed, 96 insertions, 28 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index fc2c0d93d0..9cba1b6977 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -38,6 +38,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
private ExchangeImpl _exchange;
private TerminusDurability _durability;
private TerminusExpiryPolicy _expiryPolicy;
+ private String _initialRoutingAddress;
public ExchangeDestination(ExchangeImpl exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
{
@@ -76,7 +77,13 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
+ int enqueues = _exchange.send(message,
+ _initialRoutingAddress == null
+ ? message.getInitialRoutingAddress()
+ : _initialRoutingAddress,
+ instanceProperties,
+ txn,
+ null);
return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -102,4 +109,14 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
{
return _exchange;
}
+
+ public void setInitialRoutingAddress(final String initialRoutingAddress)
+ {
+ _initialRoutingAddress = initialRoutingAddress;
+ }
+
+ public String getInitialRoutingAddress()
+ {
+ return _initialRoutingAddress;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index dedb8a3dc0..9f0c1070d3 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -35,13 +35,13 @@ public class NodeReceivingDestination implements ReceivingDestination
public static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
- private MessageDestination _exchange;
+ private MessageDestination _destination;
private TerminusDurability _durability;
private TerminusExpiryPolicy _expiryPolicy;
- public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
+ public NodeReceivingDestination(MessageDestination destination, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
{
- _exchange = exchange;
+ _destination = destination;
_durability = durable;
_expiryPolicy = expiryPolicy;
}
@@ -76,7 +76,7 @@ public class NodeReceivingDestination implements ReceivingDestination
return null;
}};
- int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
+ int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -100,6 +100,6 @@ public class NodeReceivingDestination implements ReceivingDestination
public MessageDestination getDestination()
{
- return _exchange;
+ return _destination;
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 394ab69990..24395a6fad 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -230,7 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
- String binding = "";
+ String binding = null;
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -298,8 +298,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
_queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
- exchange.addBinding(binding, queue,null);
+ if(binding != null)
+ {
+ exchange.addBinding(binding, queue,null);
+ }
+ if(exchangeDestination.getInitialRoutingAddress() != null)
+ {
+ exchange.addBinding(exchangeDestination.getInitialRoutingAddress(),queue,null);
+ }
source.setDistributionMode(StdDistMode.COPY);
qd = new QueueDestination(queue);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 78dcab9d75..411117be4d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -127,17 +127,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- MessageSource queue = getVirtualHost().getMessageSource(addr);
- if(queue != null)
+ if(!addr.startsWith("/") && addr.contains("/"))
{
- destination = new MessageSourceDestination(queue);
- }
- else
- {
- ExchangeImpl exchg = getVirtualHost().getExchange(addr);
+ String[] parts = addr.split("/",2);
+ ExchangeImpl exchg = getVirtualHost().getExchange(parts[0]);
if(exchg != null)
{
- destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+ ExchangeDestination exchangeDestination =
+ new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+ exchangeDestination.setInitialRoutingAddress(parts[1]);
+ destination = exchangeDestination;
+
}
else
{
@@ -145,6 +145,27 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
destination = null;
}
}
+ else
+ {
+ MessageSource queue = getVirtualHost().getMessageSource(addr);
+ if(queue != null)
+ {
+ destination = new MessageSourceDestination(queue);
+ }
+ else
+ {
+ ExchangeImpl exchg = getVirtualHost().getExchange(addr);
+ if(exchg != null)
+ {
+ destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+ }
+ else
+ {
+ endpoint.setSource(null);
+ destination = null;
+ }
+ }
+ }
}
else
@@ -265,28 +286,52 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
String addr = target.getAddress();
- MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
- if(messageDestination != null)
- {
- destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy());
- }
- else
+ if(!addr.startsWith("/") && addr.contains("/"))
{
- AMQQueue queue = getVirtualHost().getQueue(addr);
- if(queue != null)
+ String[] parts = addr.split("/",2);
+ ExchangeImpl exchange = getVirtualHost().getExchange(parts[0]);
+ if(exchange != null)
{
+ ExchangeDestination exchangeDestination =
+ new ExchangeDestination(exchange,
+ target.getDurable(),
+ target.getExpiryPolicy());
+
+ exchangeDestination.setInitialRoutingAddress(parts[1]);
+
+ destination = exchangeDestination;
- destination = new QueueDestination(queue);
}
else
{
endpoint.setTarget(null);
destination = null;
}
-
}
+ else
+ {
+ MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
+ if(messageDestination != null)
+ {
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy());
+ }
+ else
+ {
+ AMQQueue queue = getVirtualHost().getQueue(addr);
+ if(queue != null)
+ {
+
+ destination = new QueueDestination(queue);
+ }
+ else
+ {
+ endpoint.setTarget(null);
+ destination = null;
+ }
+ }
+ }
}
else