summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-04-24 01:54:20 +0000
committerAidan Skinner <aidan@apache.org>2008-04-24 01:54:20 +0000
commit559e9702d5a7c26dddee708e267f2f685d4de89e (patch)
treeb3114d58b39092b4699186533a50553715e42b96 /java/client/src/test
parent04fe7be0efbc3687a5a302fea6fbec82adbfedae (diff)
downloadqpid-python-559e9702d5a7c26dddee708e267f2f685d4de89e.tar.gz
QPID-832 merge M2.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651133 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java12
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java187
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java (renamed from java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java)4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java15
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java123
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java44
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java104
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java19
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (renamed from java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java)2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java132
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java12
19 files changed, 591 insertions, 157 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index 8724c65b61..fe418535d6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -126,7 +126,7 @@ public class AMQQueueDeferredOrderingTest extends TestCase
_logger.info("Consuming messages");
for (int i = 0; i < NUM_MESSAGES; i++)
{
- Message msg = consumer.receive(1500);
+ Message msg = consumer.receive(3000);
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 75e50ee09b..136b9b94b6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.client;
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -34,7 +38,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 2eb511f8cd..12b84b1495 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.client;
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -34,6 +38,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 882915fb8f..c920499a07 100644
--- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -29,7 +29,16 @@ import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.spi.InitialContextFactory;
@@ -65,12 +74,15 @@ public class ResetMessageListenerTest extends QpidTestCase
private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock
private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock
private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
- private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+
+ private String oldImmediatePrefetch;
protected void setUp() throws Exception
{
super.setUp();
+ oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH);
System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
_clientConnection = getConnection("guest", "guest");
@@ -109,8 +121,12 @@ public class ResetMessageListenerTest extends QpidTestCase
{
_clientConnection.close();
- _producerConnection.close();
super.tearDown();
+ if (oldImmediatePrefetch == null)
+ {
+ oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT;
+ }
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch);
}
public void testAsynchronousRecieve()
@@ -238,7 +254,7 @@ public class ResetMessageListenerTest extends QpidTestCase
try
{
- _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 737daeb350..61ba3aad3a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -60,6 +60,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
private final List<String> messages = new ArrayList<String>();
private int _count = 1;
public String _connectionString = "vm://:1";
+ private static final String USERNAME = "guest";
protected void setUp() throws Exception
{
@@ -171,7 +172,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.trace("Message:" + m);
+ _logger.debug("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
m.getStringProperty("TempQueue"));
@@ -287,7 +288,14 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
Assert.assertTrue("Check void properties are correctly transported",
- ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+ ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+
+ //JMSXUserID
+ if (m.getStringProperty("JMSXUserID") != null)
+ {
+ Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+ m.getStringProperty("JMSXUserID"));
+ }
}
received.clear();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index ed4f6041df..987b30ce28 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -21,7 +21,7 @@
package org.apache.qpid.test.unit.basic;
import junit.framework.TestCase;
-
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -29,11 +29,14 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -47,11 +50,12 @@ public class SelectorTest extends QpidTestCase implements MessageListener
private AMQSession _session;
private int count;
public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
protected void setUp() throws Exception
{
super.setUp();
- init((AMQConnection) getConnection("guest", "guest"));
+ init((AMQConnection) getConnection("guest", "guest"));
}
protected void tearDown() throws Exception
@@ -59,19 +63,19 @@ public class SelectorTest extends QpidTestCase implements MessageListener
super.tearDown();
}
- private void init(AMQConnection connection) throws Exception
+ private void init(AMQConnection connection) throws JMSException
{
init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true));
}
- private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
{
_connection = connection;
_destination = destination;
connection.start();
String selector = null;
- // selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+ selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
_session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -79,13 +83,17 @@ public class SelectorTest extends QpidTestCase implements MessageListener
_session.createConsumer(destination, selector).setMessageListener(this);
}
- public synchronized void test() throws JMSException, InterruptedException
+ public synchronized void test() throws Exception
{
try
{
+
+ init((AMQConnection) getConnection("guest", "guest", randomize("Client")));
+
Message msg = _session.createTextMessage("Message");
msg.setJMSPriority(1);
msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
msg.setJMSType("Special");
_logger.info("Sending Message:" + msg);
@@ -105,10 +113,147 @@ public class SelectorTest extends QpidTestCase implements MessageListener
// throw new RuntimeException("Did not get message!");
}
}
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+ catch (AMQException e)
+ {
+ _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+
finally
{
- _session.close();
- _connection.close();
+ if (_session != null)
+ {
+ _session.close();
+ }
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ }
+
+
+ public void testInvalidSelectors() throws Exception
+ {
+ Connection connection = null;
+
+ try
+ {
+ connection = getConnection("guest", "guest", randomize("Client"));
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail("Error:" + e.getMessage());
+ }
+
+ //Try Creating a Browser
+ try
+ {
+ _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Consumer
+ try
+ {
+ _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Receiever
+ try
+ {
+ _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ finally
+ {
+ if (_session != null)
+ {
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
+ if (_connection != null)
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
}
}
@@ -127,9 +272,29 @@ public class SelectorTest extends QpidTestCase implements MessageListener
public static void main(String[] argv) throws Exception
{
SelectorTest test = new SelectorTest();
- test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0];
- test.setUp();
- test.test();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.test();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
index 10c054a863..6f1ddebb0c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
@@ -35,9 +35,9 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-public class CloseTests extends QpidTestCase
+public class CloseTest extends QpidTestCase
{
- private static final Logger _logger = LoggerFactory.getLogger(CloseTests.class);
+ private static final Logger _logger = LoggerFactory.getLogger(CloseTest.class);
private static final String BROKER = "vm://:1";
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
index 6a4e01affd..965c22af4a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -75,18 +75,11 @@ public class AMQSessionTest extends QpidTestCase
public void testCreateDurableSubscriber() throws JMSException
{
- try
- {
- TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname");
- assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+ TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname");
+ assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
- subscriber = _session.createDurableSubscriber(_topic, "mysubname", "abc", false);
- assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- }
+ subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false);
+ assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
}
public void testCreateQueueReceiver() throws JMSException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
index 85fcf6d95a..b6776a1a44 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -37,7 +37,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
@@ -48,14 +48,15 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
@@ -95,6 +96,6 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
}
- protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 08d6b0bcab..45a9ca1dd6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -26,17 +26,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.testutil.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -53,6 +49,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
public class ChannelCloseTest extends QpidTestCase implements ExceptionListener, ConnectionListener
{
@@ -140,8 +139,11 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
/*
close channel and send guff then send ok no errors
+ REMOVE TEST - The behaviour after server has sent close is undefined.
+ the server should be free to fail as it may wish to reclaim its resources
+ immediately after close.
*/
- public void testSendingMethodsAfterClose() throws Exception
+ /*public void testSendingMethodsAfterClose() throws Exception
{
// this is testing an 0.8 connection
if(isBroker08())
@@ -167,7 +169,19 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
// Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession=
((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
- AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession);
+
+ MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+ MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+ Proxy.newProxyInstance(d.getClass().getClassLoader(),
+ d.getClass().getInterfaces(),
+ new MethodDispatcherProxyHandler(
+ (ClientMethodDispatcherImpl) d));
+
+ protocolSession.setMethodDispatcher(wrappedDispatcher);
+
+
+ AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
@@ -257,7 +271,7 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
}
}
}
-
+*/
private void createChannelAndTest(int channel) throws FailoverException
{
// Create A channel
@@ -284,10 +298,9 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
private void sendClose(int channel)
{
- AMQFrame frame =
- ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ ChannelCloseOkBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channel);
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
@@ -345,35 +358,43 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
private void declareExchange(int channelId, String _type, String _name, boolean nowait)
throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
-
- if (nowait)
- {
- ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
- }
- else
- {
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class,
- SYNC_TIMEOUT);
- }
+ ExchangeDeclareBody body =
+ ((AMQConnection) _connection).getProtocolHandler()
+ .getMethodRegistry()
+ .createExchangeDeclareBody(0,
+ new AMQShortString(_name),
+ new AMQShortString(_type),
+ true,
+ false,
+ false,
+ false,
+ nowait,
+ null);
+ AMQFrame exchangeDeclare = body.generateFrame(channelId);
+ AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
+
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+
+// return null;
+// }
+// }, (AMQConnection)_connection).execute();
+
}
private void createChannel(int channelId) throws AMQException, FailoverException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
ChannelOpenOkBody.class);
}
@@ -402,4 +423,28 @@ public class ChannelCloseTest extends QpidTestCase implements ExceptionListener,
public void failoverComplete()
{ }
+
+ private static final class MethodDispatcherProxyHandler implements InvocationHandler
+ {
+ private final ClientMethodDispatcherImpl _underlyingDispatcher;
+ private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance();
+
+
+ public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher)
+ {
+ _underlyingDispatcher = dispatcher;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ if(method.getName().equals("dispatchChannelClose"))
+ {
+ _handler.methodReceived(_underlyingDispatcher.getStateManager(),
+ (ChannelCloseBody) args[0], (Integer)args[1]);
+ }
+ Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes());
+ return dispatcherMethod.invoke(_underlyingDispatcher, args);
+
+ }
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
index d128f30727..c7eb745566 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
@@ -59,49 +59,7 @@ public class NoCloseOKStateManager extends AMQStateManager
super(protocolSession);
}
- protected void registerListeners()
- {
- Map frame2handlerMap = new HashMap();
-
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
- //
- // ConnectionOpen handlers
- //
- frame2handlerMap = new HashMap();
- // Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
-
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
- frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
- frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
- }
+
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 7eb74e2492..f856e8c20b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.jms.Session;
import junit.framework.TestCase;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TopicSession;
@@ -55,25 +56,30 @@ public class ConnectionTest extends TestCase
TransportConnection.killVMBroker(1);
}
- public void testSimpleConnection()
+ public void testSimpleConnection() throws Exception
{
+ AMQConnection conn = null;
try
{
- AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
- conn.close();
+ conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
}
catch (Exception e)
{
fail("Connection to " + _broker + " should succeed. Reason: " + e);
}
+ finally
+ {
+ conn.close();
+ }
}
- public void testDefaultExchanges()
+ public void testDefaultExchanges() throws Exception
{
+ AMQConnection conn = null;
try
{
- AMQConnection conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='"
+ conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='"
+ _broker
+ "?retries='1''&defaultQueueExchange='test.direct'"
+ "&defaultTopicExchange='test.topic'"
@@ -106,37 +112,53 @@ public class ConnectionTest extends TestCase
topicSession.close();
-
- conn.close();
}
catch (Exception e)
{
fail("Connection to " + _broker + " should succeed. Reason: " + e);
}
+ finally
+ {
+ conn.close();
+ }
}
- //fixme AMQAuthenticationException is not propogaged
- public void PasswordFailureConnection() throws Exception
+ //See QPID-771
+ public void testPasswordFailureConnection() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''");
+ conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''");
fail("Connection should not be established password is wrong.");
}
catch (AMQException amqe)
{
- if (!(amqe instanceof AMQAuthenticationException))
+ if (amqe.getCause().getClass() == Exception.class)
+ {
+ System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");
+ return;
+ }
+
+ assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass());
+ Exception linked = ((JMSException) amqe.getCause()).getLinkedException();
+ assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass());
+ }
+ finally
+ {
+ if (conn != null)
{
- fail("Correct exception not thrown. Excpected 'AMQAuthenticationException' got: " + amqe);
+ conn.close();
}
}
}
public void testConnectionFailure() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
fail("Connection should not be established");
}
catch (AMQException amqe)
@@ -146,14 +168,22 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe);
}
}
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
public void testUnresolvedHostFailure() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
fail("Connection should not be established");
}
catch (AMQException amqe)
@@ -163,6 +193,38 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe);
}
}
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedVirtualHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
public void testClientIdCannotBeChanged() throws Exception
@@ -180,7 +242,10 @@ public class ConnectionTest extends TestCase
}
finally
{
- connection.close();
+ if (connection != null)
+ {
+ connection.close();
+ }
}
}
@@ -188,7 +253,14 @@ public class ConnectionTest extends TestCase
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
null, "test");
- assertNotNull(connection.getClientID());
+ try
+ {
+ assertNotNull(connection.getClientID());
+ }
+ finally
+ {
+ connection.close();
+ }
connection.close();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index bfbba61913..27adc4dd77 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -473,6 +473,25 @@ public class ConnectionURLTest extends TestCase
}
+ public void testSocketProtocol() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
+
+ try
+ {
+ AMQConnectionURL curl = new AMQConnectionURL(url);
+ assertNotNull(curl);
+ assertEquals(1, curl.getBrokerCount());
+ assertNotNull(curl.getBrokerDetails(0));
+ assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
+ assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
+ assertEquals("URL does not toString as expected", url, curl.toString());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ }
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 3776ff767f..4cdd7dd7e8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -27,8 +27,9 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testutil.QpidTestCase;
-public class AMQProtocolSessionTest extends TestCase
+public class AMQProtocolSessionTest extends QpidTestCase
{
private static class AMQProtSession extends AMQProtocolSession
{
@@ -50,7 +51,7 @@ public class AMQProtocolSessionTest extends TestCase
}
//private Strings for test values and expected results
- private String _brokenAddress;
+ private String _brokenAddress = "tcp://myAddress;:";;
private String _generatedAddress;
private String _emptyAddress;
private String _generatedAddress_2;
@@ -64,7 +65,7 @@ public class AMQProtocolSessionTest extends TestCase
super.setUp();
//don't care about the values set here apart from the dummy IoSession
- _testSession = new AMQProtSession(null,new TestIoSession(),null);
+ _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest"));
//initialise addresses for test and expected results
_port = 123;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
index e78c295ce5..d25986d991 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
@@ -29,8 +29,8 @@ import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import uk.co.thebadgerset.junit.concurrency.TestRunnable;
-import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
import javax.jms.Connection;
import javax.jms.Message;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index af19db5128..9caba63fe4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTests.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -25,7 +25,7 @@ import org.apache.qpid.testutil.QpidTestCase;
* Crash Recovery tests for durable subscription
*
*/
-public class DurableSubscriberTests extends QpidTestCase
+public class DurableSubscriberTest extends QpidTestCase
{
private final String _topicName = "durableSubscriberTopic";
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index c4b94a6f36..6883a09f1b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -39,6 +39,7 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
+import java.util.Enumeration;
/**
* @author Apache Software Foundation
@@ -84,6 +85,12 @@ public class JMSPropertiesTest extends QpidTestCase
sentMsg.setJMSType(JMS_TYPE);
sentMsg.setJMSReplyTo(JMS_REPLY_TO);
+ String JMSXGroupID_VALUE = "group";
+ sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE);
+
+ int JMSXGroupSeq_VALUE = 1;
+ sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE);
+
// send it
producer.send(sentMsg);
@@ -100,6 +107,30 @@ public class JMSPropertiesTest extends QpidTestCase
// assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode());
assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType());
assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
+ assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
+
+ //Validate that the JMSX values are correct
+ assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
+ assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
+
+ boolean JMSXGroupID_Available = false;
+ boolean JMSXGroupSeq_Available = false;
+ Enumeration props = con.getMetaData().getJMSXPropertyNames();
+ while (props.hasMoreElements())
+ {
+ String name = (String) props.nextElement();
+ if (name.equals("JMSXGroupID"))
+ {
+ JMSXGroupID_Available = true;
+ }
+ if (name.equals("JMSXGroupSeq"))
+ {
+ JMSXGroupSeq_Available = true;
+ }
+ }
+
+ assertTrue("JMSXGroupID not available.",JMSXGroupID_Available);
+ assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available);
con.close();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 23bd2a960a..50ed1dee9e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -41,6 +41,14 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
+/**
+ * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
+ * a static on a base test helper class, e.g. TestUtils.
+ *
+ * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
+ * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
+ * driven by the test model.
+ */
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
@@ -113,12 +121,26 @@ public class DurableSubscriptionTest extends QpidTestCase
con.close();
}
- public void testDurability() throws Exception
+ public void testDurabilityAUTOACK() throws Exception
{
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+ public void testDurabilityNOACKSessionPerConnection() throws Exception
+ {
+ durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACKSessionPerConnection() throws Exception
+ {
+ durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void durabilityImpl(int ackMode) throws Exception
+ {
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
+ Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -144,10 +166,82 @@ public class DurableSubscriptionTest extends QpidTestCase
consumer2.close();
- Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ producer.send(session1.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
+ msg = consumer3.receive(500);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
+
+ consumer1.close();
+ consumer3.close();
+
+ con.close();
+ }
+
+ private void durabilityImplSessionPerConnection(int ackMode) throws Exception
+ {
+ Message msg;
+ // Create producer.
+ AMQConnection con0 = (AMQConnection) getConnection("guest", "guest");
+ con0.start();
+ Session session0 = con0.createSession(false, ackMode);
+
+ AMQTopic topic = new AMQTopic(con0, "MyTopic");
+
+ Session sessionProd = con0.createSession(false, ackMode);
+ MessageProducer producer = sessionProd.createProducer(topic);
+
+ // Create consumer 1.
+ AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
+ con1.start();
+ Session session1 = con1.createSession(false, ackMode);
+
+ MessageConsumer consumer1 = session0.createConsumer(topic);
+
+ // Create consumer 2.
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
+ con2.start();
+ Session session2 = con2.createSession(false, ackMode);
+
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+
+ // Send message and check that both consumers get it and only it.
+ producer.send(session0.createTextMessage("A"));
+
+ msg = consumer1.receive(500);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ msg = consumer2.receive();
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
+ msg = consumer2.receive(500);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
+
+ // Detach the durable subscriber.
+ consumer2.close();
+ session2.close();
+ con2.close();
+
+ // Send message and receive on open consumer.
+ producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(1000);
@@ -156,17 +250,27 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer1.receive(1000);
assertEquals(null, msg);
+ // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
+ con3.start();
+ Session session3 = con3.createSession(false, ackMode);
+
+ TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+
_logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(1000);
- assertEquals("B", ((TextMessage) msg).getText());
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(1000);
- assertEquals(null, msg);
- // we need to unsubscribe as the session is NO_ACKNOWLEDGE
- // messages for the durable subscriber are not deleted so the test cannot
- // be run twice in a row
- session2.unsubscribe("MySubscription");
- con.close();
+ msg = consumer3.receive(500);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
+
+ consumer1.close();
+ consumer3.close();
+
+ con0.close();
+ con1.close();
+ con3.close();
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 495cc98f31..f2f35644c6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -409,9 +409,9 @@ public class CommitRollbackTest extends QpidTestCase
}
else
{
- _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
+ _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
assertFalse("Already received message two", _gottwo);
-
+ assertFalse("Already received message redelivered two", _gottwoRedelivered);
_gottwo = true;
}
}
@@ -419,6 +419,13 @@ public class CommitRollbackTest extends QpidTestCase
verifyMessages(_consumer.receive(1000));
}
+ /**
+ * This test sends two messages receives on of them but doesn't ack it.
+ * The consumer is then closed
+ * the first message should be returned as redelivered.
+ * the second message should be delivered normally.
+ * @throws Exception
+ */
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -437,6 +444,7 @@ public class CommitRollbackTest extends QpidTestCase
assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
_logger.info("Closing Consumer");
+
_consumer.close();
_logger.info("Creating New consumer");