From 559e9702d5a7c26dddee708e267f2f685d4de89e Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 24 Apr 2008 01:54:20 +0000 Subject: QPID-832 merge M2.x git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651133 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQQueueDeferredOrderingTest.java | 2 +- .../client/MessageListenerMultiConsumerTest.java | 6 + .../apache/qpid/client/MessageListenerTest.java | 7 + .../qpid/client/ResetMessageListenerTest.java | 24 ++- .../qpid/test/unit/basic/PropertyValueTest.java | 12 +- .../apache/qpid/test/unit/basic/SelectorTest.java | 187 +++++++++++++++++++-- .../qpid/test/unit/basic/close/CloseTest.java | 74 ++++++++ .../qpid/test/unit/basic/close/CloseTests.java | 74 -------- .../qpid/test/unit/client/AMQSessionTest.java | 15 +- .../ChannelCloseMethodHandlerNoCloseOk.java | 13 +- .../unit/client/channelclose/ChannelCloseTest.java | 123 +++++++++----- .../client/channelclose/NoCloseOKStateManager.java | 44 +---- .../unit/client/connection/ConnectionTest.java | 104 ++++++++++-- .../client/connectionurl/ConnectionURLTest.java | 19 +++ .../client/protocol/AMQProtocolSessionTest.java | 7 +- .../qpid/test/unit/close/CloseBeforeAckTest.java | 4 +- .../qpid/test/unit/ct/DurableSubscriberTest.java | 167 ++++++++++++++++++ .../qpid/test/unit/ct/DurableSubscriberTests.java | 167 ------------------ .../qpid/test/unit/message/JMSPropertiesTest.java | 31 ++++ .../test/unit/topic/DurableSubscriptionTest.java | 132 +++++++++++++-- .../test/unit/transacted/CommitRollbackTest.java | 12 +- 21 files changed, 829 insertions(+), 395 deletions(-) create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java delete mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java delete mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java (limited to 'java/client/src/test') diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index 8724c65b61..fe418535d6 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -126,7 +126,7 @@ public class AMQQueueDeferredOrderingTest extends TestCase _logger.info("Consuming messages"); for (int i = 0; i < NUM_MESSAGES; i++) { - Message msg = consumer.receive(1500); + Message msg = consumer.receive(3000); assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 75e50ee09b..136b9b94b6 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -20,13 +20,17 @@ */ package org.apache.qpid.client; +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -34,7 +38,9 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 2eb511f8cd..12b84b1495 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -20,13 +20,17 @@ */ package org.apache.qpid.client; +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -34,6 +38,9 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; + +import java.util.Hashtable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java index 882915fb8f..c920499a07 100644 --- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -29,7 +29,16 @@ import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.spi.InitialContextFactory; @@ -65,12 +74,15 @@ public class ResetMessageListenerTest extends QpidTestCase private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock + + private String oldImmediatePrefetch; protected void setUp() throws Exception { super.setUp(); + oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH); System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); _clientConnection = getConnection("guest", "guest"); @@ -109,8 +121,12 @@ public class ResetMessageListenerTest extends QpidTestCase { _clientConnection.close(); - _producerConnection.close(); super.tearDown(); + if (oldImmediatePrefetch == null) + { + oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT; + } + System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch); } public void testAsynchronousRecieve() @@ -238,7 +254,7 @@ public class ResetMessageListenerTest extends QpidTestCase try { - _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 737daeb350..61ba3aad3a 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -60,6 +60,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener private final List messages = new ArrayList(); private int _count = 1; public String _connectionString = "vm://:1"; + private static final String USERNAME = "guest"; protected void setUp() throws Exception { @@ -171,7 +172,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.trace("Message:" + m); + _logger.debug("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -287,7 +288,14 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); Assert.assertTrue("Check void properties are correctly transported", - ((AMQMessage) m).getPropertyHeaders().containsKey("void")); + ((AMQMessage) m).getPropertyHeaders().containsKey("void")); + + //JMSXUserID + if (m.getStringProperty("JMSXUserID") != null) + { + Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, + m.getStringProperty("JMSXUserID")); + } } received.clear(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index ed4f6041df..987b30ce28 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -21,7 +21,7 @@ package org.apache.qpid.test.unit.basic; import junit.framework.TestCase; - +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -29,11 +29,14 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -47,11 +50,12 @@ public class SelectorTest extends QpidTestCase implements MessageListener private AMQSession _session; private int count; public String _connectionString = "vm://:1"; + private static final String INVALID_SELECTOR = "Cost LIKE 5"; protected void setUp() throws Exception { super.setUp(); - init((AMQConnection) getConnection("guest", "guest")); + init((AMQConnection) getConnection("guest", "guest")); } protected void tearDown() throws Exception @@ -59,19 +63,19 @@ public class SelectorTest extends QpidTestCase implements MessageListener super.tearDown(); } - private void init(AMQConnection connection) throws Exception + private void init(AMQConnection connection) throws JMSException { init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); } - private void init(AMQConnection connection, AMQDestination destination) throws Exception + private void init(AMQConnection connection, AMQDestination destination) throws JMSException { _connection = connection; _destination = destination; connection.start(); String selector = null; - // selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -79,13 +83,17 @@ public class SelectorTest extends QpidTestCase implements MessageListener _session.createConsumer(destination, selector).setMessageListener(this); } - public synchronized void test() throws JMSException, InterruptedException + public synchronized void test() throws Exception { try { + + init((AMQConnection) getConnection("guest", "guest", randomize("Client"))); + Message msg = _session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); + msg.setStringProperty("property-with-hyphen", "wibble"); msg.setJMSType("Special"); _logger.info("Sending Message:" + msg); @@ -105,10 +113,147 @@ public class SelectorTest extends QpidTestCase implements MessageListener // throw new RuntimeException("Did not get message!"); } } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + System.out.println("SUCCESS!!"); + } + } + catch (InterruptedException e) + { + _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); + } + catch (URLSyntaxException e) + { + _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + catch (AMQException e) + { + _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + finally { - _session.close(); - _connection.close(); + if (_session != null) + { + _session.close(); + } + if (_connection != null) + { + _connection.close(); + } + } + } + + + public void testInvalidSelectors() throws Exception + { + Connection connection = null; + + try + { + connection = getConnection("guest", "guest", randomize("Client")); + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (URLSyntaxException e) + { + fail("Error:" + e.getMessage()); + } + + //Try Creating a Browser + try + { + _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Consumer + try + { + _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Receiever + try + { + _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + finally + { + if (_session != null) + { + try + { + _session.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } + if (_connection != null) + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } } } @@ -127,9 +272,29 @@ public class SelectorTest extends QpidTestCase implements MessageListener public static void main(String[] argv) throws Exception { SelectorTest test = new SelectorTest(); - test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0]; - test.setUp(); - test.test(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; + + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.test(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java new file mode 100644 index 0000000000..6f1ddebb0c --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.basic.close; +import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +public class CloseTest extends QpidTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(CloseTest.class); + + private static final String BROKER = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.setUp(); + } + + public void testCloseQueueReceiver() throws Exception + { + AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); + + Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue")); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue); + + connection.start(); + + _logger.info("About to close consumer"); + + consumer.close(); + + _logger.info("Closed Consumer"); + connection.close(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java deleted file mode 100644 index 10c054a863..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.test.unit.basic.close; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidTestCase; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class CloseTests extends QpidTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(CloseTests.class); - - private static final String BROKER = "vm://:1"; - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.setUp(); - } - - public void testCloseQueueReceiver() throws Exception - { - AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); - - Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - - AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue")); - MessageConsumer consumer = session.createConsumer(queue); - - MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue); - - connection.start(); - - _logger.info("About to close consumer"); - - consumer.close(); - - _logger.info("Closed Consumer"); - connection.close(); - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 6a4e01affd..965c22af4a 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -75,18 +75,11 @@ public class AMQSessionTest extends QpidTestCase public void testCreateDurableSubscriber() throws JMSException { - try - { - TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname"); - assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname"); + assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); - subscriber = _session.createDurableSubscriber(_topic, "mysubname", "abc", false); - assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); - } - catch (Throwable e) - { - e.printStackTrace(); - } + subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false); + assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); } public void testCreateQueueReceiver() throws JMSException diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java index 85fcf6d95a..b6776a1a44 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -37,7 +37,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener +public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class); @@ -48,14 +48,15 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe return _handler; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) + public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; + + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); @@ -95,6 +96,6 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe } - protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + session.channelClosed(channelId, errorCode, String.valueOf(reason)); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 08d6b0bcab..45a9ca1dd6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -26,17 +26,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.testutil.QpidTestCase; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; @@ -53,6 +49,9 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, ConnectionListener { @@ -140,8 +139,11 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, /* close channel and send guff then send ok no errors + REMOVE TEST - The behaviour after server has sent close is undefined. + the server should be free to fail as it may wish to reclaim its resources + immediately after close. */ - public void testSendingMethodsAfterClose() throws Exception + /*public void testSendingMethodsAfterClose() throws Exception { // this is testing an 0.8 connection if(isBroker08()) @@ -167,7 +169,19 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession= ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); - AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession); + + MethodDispatcher d = protocolSession.getMethodDispatcher(); + + MethodDispatcher wrappedDispatcher = (MethodDispatcher) + Proxy.newProxyInstance(d.getClass().getClassLoader(), + d.getClass().getInterfaces(), + new MethodDispatcherProxyHandler( + (ClientMethodDispatcherImpl) d)); + + protocolSession.setMethodDispatcher(wrappedDispatcher); + + + AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager); @@ -257,7 +271,7 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, } } } - +*/ private void createChannelAndTest(int channel) throws FailoverException { // Create A channel @@ -284,10 +298,9 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, private void sendClose(int channel) { - AMQFrame frame = - ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + ChannelCloseOkBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody(); + AMQFrame frame = body.generateFrame(channel); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } @@ -345,35 +358,43 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException, FailoverException { - AMQFrame exchangeDeclare = - ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type - - if (nowait) - { - ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare); - } - else - { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, - SYNC_TIMEOUT); - } + ExchangeDeclareBody body = + ((AMQConnection) _connection).getProtocolHandler() + .getMethodRegistry() + .createExchangeDeclareBody(0, + new AMQShortString(_name), + new AMQShortString(_type), + true, + false, + false, + false, + nowait, + null); + AMQFrame exchangeDeclare = body.generateFrame(channelId); + AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler(); + + + if (nowait) + { + protocolHandler.writeFrame(exchangeDeclare); + } + else + { + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + } + +// return null; +// } +// }, (AMQConnection)_connection).execute(); + } private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); + + ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand ChannelOpenOkBody.class); } @@ -402,4 +423,28 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, public void failoverComplete() { } + + private static final class MethodDispatcherProxyHandler implements InvocationHandler + { + private final ClientMethodDispatcherImpl _underlyingDispatcher; + private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance(); + + + public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher) + { + _underlyingDispatcher = dispatcher; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + if(method.getName().equals("dispatchChannelClose")) + { + _handler.methodReceived(_underlyingDispatcher.getStateManager(), + (ChannelCloseBody) args[0], (Integer)args[1]); + } + Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes()); + return dispatcherMethod.invoke(_underlyingDispatcher, args); + + } + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java index d128f30727..c7eb745566 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java @@ -59,49 +59,7 @@ public class NoCloseOKStateManager extends AMQStateManager super(protocolSession); } - protected void registerListeners() - { - Map frame2handlerMap = new HashMap(); - - // we need to register a map for the null (i.e. all state) handlers otherwise you get - // a stack overflow in the handler searching code when you present it with a frame for which - // no handlers are registered - // - _state2HandlersMap.put(null, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap); - - // - // ConnectionOpen handlers - // - frame2handlerMap = new HashMap(); - // Use Test Handler for Close methods to not send Close-OKs - frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance()); - - frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); - frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); - frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); - frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); - frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); - frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); - } + } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 7eb74e2492..f856e8c20b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.jms.Session; import junit.framework.TestCase; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TopicSession; @@ -55,25 +56,30 @@ public class ConnectionTest extends TestCase TransportConnection.killVMBroker(1); } - public void testSimpleConnection() + public void testSimpleConnection() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); - conn.close(); + conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } - public void testDefaultExchanges() + public void testDefaultExchanges() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + _broker + "?retries='1''&defaultQueueExchange='test.direct'" + "&defaultTopicExchange='test.topic'" @@ -106,37 +112,53 @@ public class ConnectionTest extends TestCase topicSession.close(); - - conn.close(); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } - //fixme AMQAuthenticationException is not propogaged - public void PasswordFailureConnection() throws Exception + //See QPID-771 + public void testPasswordFailureConnection() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); + conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); fail("Connection should not be established password is wrong."); } catch (AMQException amqe) { - if (!(amqe instanceof AMQAuthenticationException)) + if (amqe.getCause().getClass() == Exception.class) + { + System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); + return; + } + + assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass()); + Exception linked = ((JMSException) amqe.getCause()).getLinkedException(); + assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass()); + } + finally + { + if (conn != null) { - fail("Correct exception not thrown. Excpected 'AMQAuthenticationException' got: " + amqe); + conn.close(); } } } public void testConnectionFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -146,14 +168,22 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testUnresolvedHostFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -163,6 +193,38 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedVirtualHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testClientIdCannotBeChanged() throws Exception @@ -180,7 +242,10 @@ public class ConnectionTest extends TestCase } finally { - connection.close(); + if (connection != null) + { + connection.close(); + } } } @@ -188,7 +253,14 @@ public class ConnectionTest extends TestCase { Connection connection = new AMQConnection(_broker, "guest", "guest", null, "test"); - assertNotNull(connection.getClientID()); + try + { + assertNotNull(connection.getClientID()); + } + finally + { + connection.close(); + } connection.close(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index bfbba61913..27adc4dd77 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -473,6 +473,25 @@ public class ConnectionURLTest extends TestCase } + public void testSocketProtocol() throws URLSyntaxException + { + String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'"; + + try + { + AMQConnectionURL curl = new AMQConnectionURL(url); + assertNotNull(curl); + assertEquals(1, curl.getBrokerCount()); + assertNotNull(curl.getBrokerDetails(0)); + assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport()); + assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost()); + assertEquals("URL does not toString as expected", url, curl.toString()); + } + catch (URLSyntaxException e) + { + fail(e.getMessage()); + } + } public static junit.framework.Test suite() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 3776ff767f..4cdd7dd7e8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -27,8 +27,9 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testutil.QpidTestCase; -public class AMQProtocolSessionTest extends TestCase +public class AMQProtocolSessionTest extends QpidTestCase { private static class AMQProtSession extends AMQProtocolSession { @@ -50,7 +51,7 @@ public class AMQProtocolSessionTest extends TestCase } //private Strings for test values and expected results - private String _brokenAddress; + private String _brokenAddress = "tcp://myAddress;:";; private String _generatedAddress; private String _emptyAddress; private String _generatedAddress_2; @@ -64,7 +65,7 @@ public class AMQProtocolSessionTest extends TestCase super.setUp(); //don't care about the values set here apart from the dummy IoSession - _testSession = new AMQProtSession(null,new TestIoSession(),null); + _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest")); //initialise addresses for test and expected results _port = 123; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index e78c295ce5..d25986d991 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -29,8 +29,8 @@ import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import uk.co.thebadgerset.junit.concurrency.TestRunnable; -import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator; +import org.apache.qpid.junit.concurrency.TestRunnable; +import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; import javax.jms.Connection; import javax.jms.Message; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java new file mode 100644 index 0000000000..9caba63fe4 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -0,0 +1,167 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.ct; + +import javax.jms.*; + +import org.apache.qpid.testutil.QpidTestCase; + +/** + * Crash Recovery tests for durable subscription + * + */ +public class DurableSubscriberTest extends QpidTestCase +{ + private final String _topicName = "durableSubscriberTopic"; + + /** + * test strategy: + * create and register a durable subscriber then close it + * create a publisher and send a persistant message followed by a non persistant message + * crash and restart the broker + * recreate the durable subscriber and check that only the first message is received + */ + public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception + { + if (!isBroker08()) + { + TopicConnectionFactory factory = getConnectionFactory(); + Topic topic = (Topic) getInitialContext().lookup(_topicName); + //create and register a durable subscriber then close it + TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); + TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub"); + durConnection.start(); + durSub1.close(); + durSession.close(); + durConnection.stop(); + + //create a publisher and send a persistant message followed by a non persistant message + TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); + TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = pubSession.createPublisher(topic); + Message message = pubSession.createMessage(); + message.setIntProperty("count", 1); + publisher.publish(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, + javax.jms.Message.DEFAULT_TIME_TO_LIVE); + message.setIntProperty("count", 2); + publisher.publish(message, javax.jms.DeliveryMode.NON_PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, + javax.jms.Message.DEFAULT_TIME_TO_LIVE); + publisher.close(); + pubSession.close(); + //now stop the server + try + { + shutdownServer(); + } + catch (Exception e) + { + System.out.println("problems shutting down arjuna-ms"); + throw e; + } + //now recreate the durable subscriber and check the received messages + factory = getConnectionFactory(); + topic = (Topic) getInitialContext().lookup(_topicName); + TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); + TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); + durConnection2.start(); + Message m1 = durSub2.receive(1000); + if (m1 == null) + { + assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. no message was returned", + false); + } + assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. Wrong message was returned.", + m1.getIntProperty("count") == 1); + durSession2.unsubscribe("dursub"); + durConnection2.close(); + } + } + + /** + * create and register a durable subscriber with a message selector and then close it + * crash the broker + * create a publisher and send 5 right messages and 5 wrong messages + * recreate the durable subscriber and check the received the 5 expected messages + */ + public void testDurSubRestoresMessageSelector() throws Exception + { + if (!isBroker08()) + { + TopicConnectionFactory factory = getConnectionFactory(); + Topic topic = (Topic) getInitialContext().lookup(_topicName); + //create and register a durable subscriber with a message selector and then close it + TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); + TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub", "testprop='true'", false); + durConnection.start(); + durSub1.close(); + durSession.close(); + durConnection.stop(); + //now stop the server + try + { + shutdownServer(); + } + catch (Exception e) + { + System.out.println("problems shutting down arjuna-ms"); + throw e; + } + topic = (Topic) getInitialContext().lookup(_topicName); + factory = getConnectionFactory(); + TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); + TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = pubSession.createPublisher(topic); + for (int i = 0; i < 5; i++) + { + Message message = pubSession.createMessage(); + message.setStringProperty("testprop", "true"); + publisher.publish(message); + message = pubSession.createMessage(); + message.setStringProperty("testprop", "false"); + publisher.publish(message); + } + publisher.close(); + pubSession.close(); + + //now recreate the durable subscriber and check the received messages + TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); + TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); + durConnection2.start(); + for (int i = 0; i < 5; i++) + { + Message message = durSub2.receive(1000); + if (message == null) + { + assertTrue("testDurSubRestoresMessageSelector test failed. no message was returned", false); + } + else + { + assertTrue("testDurSubRestoresMessageSelector test failed. message selector not reset", + message.getStringProperty("testprop").equals("true")); + } + } + durSession2.unsubscribe("dursub"); + durConnection2.close(); + } + } +} + diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java deleted file mode 100644 index af19db5128..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java +++ /dev/null @@ -1,167 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.ct; - -import javax.jms.*; - -import org.apache.qpid.testutil.QpidTestCase; - -/** - * Crash Recovery tests for durable subscription - * - */ -public class DurableSubscriberTests extends QpidTestCase -{ - private final String _topicName = "durableSubscriberTopic"; - - /** - * test strategy: - * create and register a durable subscriber then close it - * create a publisher and send a persistant message followed by a non persistant message - * crash and restart the broker - * recreate the durable subscriber and check that only the first message is received - */ - public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception - { - if (!isBroker08()) - { - TopicConnectionFactory factory = getConnectionFactory(); - Topic topic = (Topic) getInitialContext().lookup(_topicName); - //create and register a durable subscriber then close it - TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); - TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub"); - durConnection.start(); - durSub1.close(); - durSession.close(); - durConnection.stop(); - - //create a publisher and send a persistant message followed by a non persistant message - TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); - TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = pubSession.createPublisher(topic); - Message message = pubSession.createMessage(); - message.setIntProperty("count", 1); - publisher.publish(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, - javax.jms.Message.DEFAULT_TIME_TO_LIVE); - message.setIntProperty("count", 2); - publisher.publish(message, javax.jms.DeliveryMode.NON_PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, - javax.jms.Message.DEFAULT_TIME_TO_LIVE); - publisher.close(); - pubSession.close(); - //now stop the server - try - { - shutdownServer(); - } - catch (Exception e) - { - System.out.println("problems shutting down arjuna-ms"); - throw e; - } - //now recreate the durable subscriber and check the received messages - factory = getConnectionFactory(); - topic = (Topic) getInitialContext().lookup(_topicName); - TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); - TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); - durConnection2.start(); - Message m1 = durSub2.receive(1000); - if (m1 == null) - { - assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. no message was returned", - false); - } - assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. Wrong message was returned.", - m1.getIntProperty("count") == 1); - durSession2.unsubscribe("dursub"); - durConnection2.close(); - } - } - - /** - * create and register a durable subscriber with a message selector and then close it - * crash the broker - * create a publisher and send 5 right messages and 5 wrong messages - * recreate the durable subscriber and check the received the 5 expected messages - */ - public void testDurSubRestoresMessageSelector() throws Exception - { - if (!isBroker08()) - { - TopicConnectionFactory factory = getConnectionFactory(); - Topic topic = (Topic) getInitialContext().lookup(_topicName); - //create and register a durable subscriber with a message selector and then close it - TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); - TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub", "testprop='true'", false); - durConnection.start(); - durSub1.close(); - durSession.close(); - durConnection.stop(); - //now stop the server - try - { - shutdownServer(); - } - catch (Exception e) - { - System.out.println("problems shutting down arjuna-ms"); - throw e; - } - topic = (Topic) getInitialContext().lookup(_topicName); - factory = getConnectionFactory(); - TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); - TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = pubSession.createPublisher(topic); - for (int i = 0; i < 5; i++) - { - Message message = pubSession.createMessage(); - message.setStringProperty("testprop", "true"); - publisher.publish(message); - message = pubSession.createMessage(); - message.setStringProperty("testprop", "false"); - publisher.publish(message); - } - publisher.close(); - pubSession.close(); - - //now recreate the durable subscriber and check the received messages - TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); - TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); - durConnection2.start(); - for (int i = 0; i < 5; i++) - { - Message message = durSub2.receive(1000); - if (message == null) - { - assertTrue("testDurSubRestoresMessageSelector test failed. no message was returned", false); - } - else - { - assertTrue("testDurSubRestoresMessageSelector test failed. message selector not reset", - message.getStringProperty("testprop").equals("true")); - } - } - durSession2.unsubscribe("dursub"); - durConnection2.close(); - } - } -} - diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index c4b94a6f36..6883a09f1b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -39,6 +39,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; +import java.util.Enumeration; /** * @author Apache Software Foundation @@ -84,6 +85,12 @@ public class JMSPropertiesTest extends QpidTestCase sentMsg.setJMSType(JMS_TYPE); sentMsg.setJMSReplyTo(JMS_REPLY_TO); + String JMSXGroupID_VALUE = "group"; + sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE); + + int JMSXGroupSeq_VALUE = 1; + sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE); + // send it producer.send(sentMsg); @@ -100,6 +107,30 @@ public class JMSPropertiesTest extends QpidTestCase // assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); + assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); + + //Validate that the JMSX values are correct + assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); + assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); + + boolean JMSXGroupID_Available = false; + boolean JMSXGroupSeq_Available = false; + Enumeration props = con.getMetaData().getJMSXPropertyNames(); + while (props.hasMoreElements()) + { + String name = (String) props.nextElement(); + if (name.equals("JMSXGroupID")) + { + JMSXGroupID_Available = true; + } + if (name.equals("JMSXGroupSeq")) + { + JMSXGroupSeq_Available = true; + } + } + + assertTrue("JMSXGroupID not available.",JMSXGroupID_Available); + assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available); con.close(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 23bd2a960a..50ed1dee9e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -41,6 +41,14 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicSubscriber; +/** + * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as + * a static on a base test helper class, e.g. TestUtils. + * + * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored + * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit, + * driven by the test model. + */ public class DurableSubscriptionTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); @@ -113,12 +121,26 @@ public class DurableSubscriptionTest extends QpidTestCase con.close(); } - public void testDurability() throws Exception + public void testDurabilityAUTOACK() throws Exception { + durabilityImpl(Session.AUTO_ACKNOWLEDGE); + } + public void testDurabilityNOACKSessionPerConnection() throws Exception + { + durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE); + } + + public void testDurabilityAUTOACKSessionPerConnection() throws Exception + { + durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); + } + + private void durabilityImpl(int ackMode) throws Exception + { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic"); - Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic(con, "MyTopic"); + Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -144,10 +166,82 @@ public class DurableSubscriptionTest extends QpidTestCase consumer2.close(); - Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + producer.send(session1.createTextMessage("B")); + + _logger.info("Receive message on consumer 1 :expecting B"); + msg = consumer1.receive(500); + assertNotNull("Consumer 1 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - producer.send(session1.createTextMessage("B")); + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 3 :expecting null"); + msg = consumer3.receive(500); + assertNull("There should be no more messages for consumption on consumer3.", msg); + + consumer1.close(); + consumer3.close(); + + con.close(); + } + + private void durabilityImplSessionPerConnection(int ackMode) throws Exception + { + Message msg; + // Create producer. + AMQConnection con0 = (AMQConnection) getConnection("guest", "guest"); + con0.start(); + Session session0 = con0.createSession(false, ackMode); + + AMQTopic topic = new AMQTopic(con0, "MyTopic"); + + Session sessionProd = con0.createSession(false, ackMode); + MessageProducer producer = sessionProd.createProducer(topic); + + // Create consumer 1. + AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); + con1.start(); + Session session1 = con1.createSession(false, ackMode); + + MessageConsumer consumer1 = session0.createConsumer(topic); + + // Create consumer 2. + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + con2.start(); + Session session2 = con2.createSession(false, ackMode); + + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + + // Send message and check that both consumers get it and only it. + producer.send(session0.createTextMessage("A")); + + msg = consumer1.receive(500); + assertNotNull("Message should be available", msg); + assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + msg = consumer2.receive(); + assertNotNull(msg); + assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); + msg = consumer2.receive(500); + assertNull("There should be no more messages for consumption on consumer2.", msg); + + // Detach the durable subscriber. + consumer2.close(); + session2.close(); + con2.close(); + + // Send message and receive on open consumer. + producer.send(session0.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(1000); @@ -156,17 +250,27 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer1.receive(1000); assertEquals(null, msg); + // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. + AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); + con3.start(); + Session session3 = con3.createSession(false, ackMode); + + TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); + _logger.info("Receive message on consumer 3 :expecting B"); - msg = consumer3.receive(1000); - assertEquals("B", ((TextMessage) msg).getText()); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); - msg = consumer3.receive(1000); - assertEquals(null, msg); - // we need to unsubscribe as the session is NO_ACKNOWLEDGE - // messages for the durable subscriber are not deleted so the test cannot - // be run twice in a row - session2.unsubscribe("MySubscription"); - con.close(); + msg = consumer3.receive(500); + assertNull("There should be no more messages for consumption on consumer3.", msg); + + consumer1.close(); + consumer3.close(); + + con0.close(); + con1.close(); + con3.close(); } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 495cc98f31..f2f35644c6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -409,9 +409,9 @@ public class CommitRollbackTest extends QpidTestCase } else { - _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); assertFalse("Already received message two", _gottwo); - + assertFalse("Already received message redelivered two", _gottwoRedelivered); _gottwo = true; } } @@ -419,6 +419,13 @@ public class CommitRollbackTest extends QpidTestCase verifyMessages(_consumer.receive(1000)); } + /** + * This test sends two messages receives on of them but doesn't ack it. + * The consumer is then closed + * the first message should be returned as redelivered. + * the second message should be delivered normally. + * @throws Exception + */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -437,6 +444,7 @@ public class CommitRollbackTest extends QpidTestCase assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); _logger.info("Closing Consumer"); + _consumer.close(); _logger.info("Creating New consumer"); -- cgit v1.2.1