From 2e7e6fed93a10f90fd38228ffb034d53450f2413 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Sun, 5 Sep 2010 18:51:15 +0000 Subject: QPID-2418: updates to fix test failures when using the 0-10 client test profiles. Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously. Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time. Enable the Java broker to perform argument matching during the 0-10 isBound check. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992856 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 31 +++++++++++++++++----- .../org/apache/qpid/client/AMQSession_0_10.java | 5 ---- .../org/apache/qpid/client/AMQSession_0_8.java | 8 ++++++ .../qpid/test/unit/message/TestAMQSession.java | 7 +++++ 4 files changed, 39 insertions(+), 12 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b96b32d990..0f7e0b0812 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -1066,10 +1068,21 @@ public abstract class AMQSession args = new HashMap(); + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + boolean isQueueBoundForTopicAndSelector = + isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + + if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } @@ -1089,6 +1102,7 @@ public abstract class AMQSession args) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75db5d5673..c1021e121c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession args) throws JMSException + { + return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), + queueName == null ? null : new AMQShortString(queueName), + bindingKey == null ? null : new AMQShortString(bindingKey)); + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f7a37e4894..47c0359b94 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import javax.jms.*; + import java.util.Map; public class TestAMQSession extends AMQSession @@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession args) throws JMSException + { + return false; + } } -- cgit v1.2.1