From 2e7e6fed93a10f90fd38228ffb034d53450f2413 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Sun, 5 Sep 2010 18:51:15 +0000 Subject: QPID-2418: updates to fix test failures when using the 0-10 client test profiles. Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously. Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time. Enable the Java broker to perform argument matching during the 0-10 isBound check. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992856 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/transport/ServerSessionDelegate.java | 4 ++- .../java/org/apache/qpid/client/AMQSession.java | 31 ++++++++++++++---- .../org/apache/qpid/client/AMQSession_0_10.java | 5 --- .../org/apache/qpid/client/AMQSession_0_8.java | 8 +++++ .../qpid/test/unit/message/TestAMQSession.java | 7 ++++ .../qpid/server/logging/BindingLoggingTest.java | 5 +-- .../qpid/test/unit/message/StreamMessageTest.java | 1 + .../test/unit/topic/DurableSubscriptionTest.java | 37 ++++++++++++---------- java/test-profiles/CPPExcludes | 7 ---- java/test-profiles/Java010Excludes | 7 ---- java/test-profiles/cpp.async.excludes | 3 -- 11 files changed, 67 insertions(+), 48 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 44a677a76d..7b51b68e61 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -756,7 +756,9 @@ public class ServerSessionDelegate extends SessionDelegate if(method.hasArguments()) { - // TODO + FieldTable args = FieldTable.convertToFieldTable(method.getArguments()); + + result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue)); } if(queueMatched) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b96b32d990..0f7e0b0812 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -1066,10 +1068,21 @@ public abstract class AMQSession args = new HashMap(); + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + boolean isQueueBoundForTopicAndSelector = + isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + + if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } @@ -1089,6 +1102,7 @@ public abstract class AMQSession args) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75db5d5673..c1021e121c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession args) throws JMSException + { + return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), + queueName == null ? null : new AMQShortString(queueName), + bindingKey == null ? null : new AMQShortString(bindingKey)); + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f7a37e4894..47c0359b94 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import javax.jms.*; + import java.util.Map; public class TestAMQSession extends AMQSession @@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession args) throws JMSException + { + return false; + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index afda7d4ba9..51815e2adc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -94,7 +94,7 @@ public class BindingLoggingTest extends AbstractTestLogging * 2. New Client requests that a Queue is bound to a new exchange. * Output: * - * BND-1001 : Create + * BND-1001 : Create : Arguments : {x-filter-jms-selector=} * * Validation Steps: * 3. The BND ID is correct @@ -117,6 +117,7 @@ public class BindingLoggingTest extends AbstractTestLogging validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName); exchange = "direct/amq.direct"; + message = "Create : Arguments : {x-filter-jms-selector=}"; validateLogMessage(getLogMessage(results, 1), messageID, message, exchange, queueName, queueName); } @@ -129,7 +130,7 @@ public class BindingLoggingTest extends AbstractTestLogging * 2. Java Client consumes from a topic with a JMS selector. * Output: * - * BND-1001 : Create : Arguments : + * BND-1001 : Create : Arguments : {x-filter-jms-selector=} * * Validation Steps: * 3. The BND ID is correct diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 98d59982e5..0f799073b4 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -75,6 +75,7 @@ public class StreamMessageTest extends QpidBrokerTestCase ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new FieldTable(); + ft.setString("x-match", "any"); ft.setString("F1000", "1"); MessageConsumer consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 3dd3c72024..d73761d12a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -544,23 +544,22 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); - //verify no messages are now present as changing selector should have issued - //an unsubscribe and thus deleted the previous backing queue for the subscription. + //verify no messages are now recieved. rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertNull("Should not have received message as the queue underlying the " + - "subscription should have been cleared/deleted when the selector was changed", rMsg); + assertNull("Should not have received message as the selector was changed", rMsg); // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); - assertNotNull("Message should not be received", rMsg); + assertNotNull("Message should have been received", rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector2", ((TextMessage) rMsg).getText()); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertNull("Message should be received",rMsg); + assertNull("Message should not have been received",rMsg); session.unsubscribe("testResubscribeWithChangedSelector"); } @@ -613,7 +612,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase { Connection conn = getConnection(); conn.start(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); //create and register a durable subscriber with a message selector and then close it @@ -630,15 +629,17 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase message.setBooleanProperty("testprop", false); producer.send(message); } + session.commit(); producer.close(); + // should be 5 or 10 messages on queue now + // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); + assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession) session).getQueueDepth(queue)); + // now recreate the durable subscriber and check the received messages TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); - // should be 5 messages on queue now - AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); - assertEquals("Queue depth is wrong", 5, ((AMQSession) session).getQueueDepth(queue)); - for (int i = 0; i < 5; i++) { Message message = subTwo.receive(1000); @@ -653,6 +654,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase } } + session.commit(); + // Check queue has no messages assertEquals("Queue should be empty", 0, ((AMQSession) session).getQueueDepth(queue)); @@ -710,10 +713,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("Match", false); producer.send(msg); - // should be 1 message on queue now + // should be 1 or 2 messages on queue now + // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); - assertEquals("Queue depth is wrong", 1, ((AMQSession) session).getQueueDepth(queue)); - + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession) session).getQueueDepth(queue)); + Message rMsg = subB.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", @@ -773,9 +777,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("testprop", false); producer.send(msg); - // should be 1 message on queue now + // should be 1 or 2 messages on queue now + // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); - assertEquals("Queue depth is wrong", 1, ((AMQSession) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession) session).getQueueDepth(queue)); Message rMsg = subTwo.receive(1000); assertNotNull(rMsg); diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index fe1ce82b7e..711a3954e4 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -125,13 +125,6 @@ org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage // Temporarily adding the following until the issues are sorted out. org.apache.qpid.test.unit.client.AMQConnectionTest#testHeartBeat -//QPID-2418 : Not yet implemented on 0-10 -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose - //Excluded due to QPID-1447 : CPP broker does not have SlowConsumer Disconnection org.apache.qpid.systest.GlobalQueuesTest#* org.apache.qpid.systest.GlobalTopicsTest#* diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index 47fca90500..6da362f33a 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -62,10 +62,3 @@ org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* org.apache.qpid.test.unit.basic.LargeMessageTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode - -//QPID-2418 : Not yet implemented on 0-10 -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose diff --git a/java/test-profiles/cpp.async.excludes b/java/test-profiles/cpp.async.excludes index 72d79fb754..b6479a00ba 100644 --- a/java/test-profiles/cpp.async.excludes +++ b/java/test-profiles/cpp.async.excludes @@ -1,5 +1,2 @@ -// the C++ broker doesn't implement selectors, so they are not persisted with the subscription -org.apache.qpid.test.unit.ct.DurableSubscriberTest#testDurSubRestoresMessageSelector - // the C++ broker doesn't guarantee the order of messages on recovery org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash -- cgit v1.2.1