summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
commit1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch)
treef304353182e02369661b8ecfdde357a288b183e3 /qpid/java/client/src
parent8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff)
downloadqpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java2
5 files changed, 59 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0183c30276..9bdcb9e83f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -683,47 +683,48 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
int type = resolveAddressType(dest);
-
+ boolean resolved = false;
switch (type)
{
case AMQDestination.QUEUE_TYPE:
- {
+
+ setLegacyFieldsForQueueType(dest);
if(createNode)
{
- setLegacyFieldsForQueueType(dest);
handleQueueNodeCreation(dest,noLocal);
- break;
+ resolved = true;
}
else if (isQueueExist(dest,assertNode))
{
- setLegacyFieldsForQueueType(dest);
- break;
+ resolved = true;
}
- }
+ break;
case AMQDestination.TOPIC_TYPE:
- {
+
+ setLegacyFieldsForTopicType(dest);
if(createNode)
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
handleExchangeNodeCreation(dest);
- break;
+ resolved = true;
}
else if (isExchangeExist(dest,assertNode))
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
- break;
+ resolved = true;
}
- }
+ break;
default:
throw new AMQException(
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(System.currentTimeMillis());
+ if(resolved)
+ {
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 68b7cf1f88..dc1f9a719e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
Link link = destination.getLink();
if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
+ arguments.putAll(link.getSubscription().getArgs());
}
boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+ String queue = queueName == null ? destination.getAddressName() : queueName.toString();
getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
+ (queue, String.valueOf(tag),
acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+ String consumerTag = (consumer).getConsumerTagString();
if (capacity == 0)
{
@@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
catch(SessionException e)
{
- if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED
+ || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
{
match = false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 0145d15111..b0615ea99f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
queueName = preprocessAddressTopic(consumer, queueName);
+ AMQDestination destination = consumer.getDestination();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
+ {
+ arguments.putAll(link.getSubscription().getArgs());
+ }
+
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
new AMQShortString(String.valueOf(tag)),
@@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- consumer.getArguments());
+ FieldTable.convertToFieldTable(arguments));
AMQFrame jmsConsume = body.generateFrame(getChannelId());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 99154e820f..91e2143c47 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -43,7 +43,8 @@ public class AddressHelper
public static final String LINK = "link";
public static final String X_DECLARE = "x-declare";
public static final String X_BINDINGS = "x-bindings";
- public static final String X_SUBSCRIBE = "x-subscribes";
+ public static final String X_SUBSCRIBES = "x-subscribes";
+ public static final String X_SUBSCRIBE = "x-subscribe";
public static final String CREATE = "create";
public static final String ASSERT = "assert";
public static final String DELETE = "delete";
@@ -265,19 +266,32 @@ public class AddressHelper
Map linkMap = (Map) _address.getOptions().get(LINK);
- if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
- {
- Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
-
- if (x_subscribe.containsKey(ARGUMENTS))
+ if (linkMap != null)
+ {
+ Map x_subscribe = null;
+
+ if(linkMap.containsKey(X_SUBSCRIBE))
{
- link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ }
+ else if(linkMap.containsKey(X_SUBSCRIBES))
+ {
+ // left in for backwards compatibility with old broken constant
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES);
+ }
+
+ if(x_subscribe != null)
+ {
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false;
+
+ link.getSubscription().setExclusive(exclusive);
}
-
- boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
- Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
-
- link.getSubscription().setExclusive(exclusive);
}
link.setBindings(getBindings(linkMap));
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index a614690f83..7e9cb3072a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -28,7 +28,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination.Binding;
public class Link
-{
+{
public enum FilterType { SQL92, XQUERY, SUBJECT }
public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }