diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
| commit | 1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch) | |
| tree | f304353182e02369661b8ecfdde357a288b183e3 /qpid/java/client/src | |
| parent | 8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff) | |
| download | qpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz | |
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
5 files changed, 59 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0183c30276..9bdcb9e83f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -683,47 +683,48 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic int type = resolveAddressType(dest); - + boolean resolved = false; switch (type) { case AMQDestination.QUEUE_TYPE: - { + + setLegacyFieldsForQueueType(dest); if(createNode) { - setLegacyFieldsForQueueType(dest); handleQueueNodeCreation(dest,noLocal); - break; + resolved = true; } else if (isQueueExist(dest,assertNode)) { - setLegacyFieldsForQueueType(dest); - break; + resolved = true; } - } + break; case AMQDestination.TOPIC_TYPE: - { + + setLegacyFieldsForTopicType(dest); if(createNode) { - setLegacyFieldsForTopicType(dest); verifySubject(dest); handleExchangeNodeCreation(dest); - break; + resolved = true; } else if (isExchangeExist(dest,assertNode)) { - setLegacyFieldsForTopicType(dest); verifySubject(dest); - break; + resolved = true; } - } + break; default: throw new AMQException( "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } - dest.setAddressResolved(System.currentTimeMillis()); + if(resolved) + { + dest.setAddressResolved(System.currentTimeMillis()); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 68b7cf1f88..dc1f9a719e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic Link link = destination.getLink(); if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs()); + arguments.putAll(link.getSubscription().getArgs()); } boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + String queue = queueName == null ? destination.getAddressName() : queueName.toString(); getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), + (queue, String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + String consumerTag = (consumer).getConsumerTagString(); if (capacity == 0) { @@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } catch(SessionException e) { - if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED + || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) { match = false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 0145d15111..b0615ea99f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { queueName = preprocessAddressTopic(consumer, queueName); + AMQDestination destination = consumer.getDestination(); + + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) + { + arguments.putAll(link.getSubscription().getArgs()); + } + BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, new AMQShortString(String.valueOf(tag)), @@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), nowait, - consumer.getArguments()); + FieldTable.convertToFieldTable(arguments)); AMQFrame jmsConsume = body.generateFrame(getChannelId()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 99154e820f..91e2143c47 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -43,7 +43,8 @@ public class AddressHelper public static final String LINK = "link"; public static final String X_DECLARE = "x-declare"; public static final String X_BINDINGS = "x-bindings"; - public static final String X_SUBSCRIBE = "x-subscribes"; + public static final String X_SUBSCRIBES = "x-subscribes"; + public static final String X_SUBSCRIBE = "x-subscribe"; public static final String CREATE = "create"; public static final String ASSERT = "assert"; public static final String DELETE = "delete"; @@ -265,19 +266,32 @@ public class AddressHelper Map linkMap = (Map) _address.getOptions().get(LINK); - if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) - { - Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); - - if (x_subscribe.containsKey(ARGUMENTS)) + if (linkMap != null) + { + Map x_subscribe = null; + + if(linkMap.containsKey(X_SUBSCRIBE)) { - link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); + } + else if(linkMap.containsKey(X_SUBSCRIBES)) + { + // left in for backwards compatibility with old broken constant + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES); + } + + if(x_subscribe != null) + { + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false; + + link.getSubscription().setExclusive(exclusive); } - - boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? - Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; - - link.getSubscription().setExclusive(exclusive); } link.setBindings(getBindings(linkMap)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index a614690f83..7e9cb3072a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination.Binding; public class Link -{ +{ public enum FilterType { SQL92, XQUERY, SUBJECT } public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } |
