diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:30:34 +0000 |
| commit | d15fc6f29484cbd05e95b584beeb6e1a59da1367 (patch) | |
| tree | 387ab3b78951c497dc39e307a0a1fb6dc789e09a /java/systests/src | |
| parent | 26e07c83dd4df84f61767de0c16d6732e8a6e30d (diff) | |
| download | qpid-python-d15fc6f29484cbd05e95b584beeb6e1a59da1367.tar.gz | |
QPID-3829: use a seperate object for reference checking to stop the AMQMessage holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when evaluating no-local after store recovery.
Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and more reliable. This test should be removed if changes for QPID-3605 are undertaken.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1243379 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java | 171 |
1 files changed, 30 insertions, 141 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java index a95d07ec45..2e259191aa 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -20,14 +20,8 @@ */ package org.apache.qpid.server.persistent; -import org.apache.commons.configuration.XMLConfiguration; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.store.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.ArrayList; +import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; @@ -36,60 +30,28 @@ import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** - * QPID-1813 : We do not store the client id with a message so on store restart - * that information is lost and we are unable to perform no local checks. - * - * QPID-1813 highlights the lack of testing here as the broker will NPE as it - * assumes that the client id of the publisher will always exist + * Verifies that after recovery, a new Connection with no-local in use is + * able to receive messages sent prior to the broker restart. */ -public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements ConnectionListener +public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase { protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName(); protected static final int SEND_COUNT = 10; - private CountDownLatch _failoverComplete = new CountDownLatch(1); - - protected ConnectionURL _connectionURL; - - @Override - protected void setUp() throws Exception - { - - XMLConfiguration configuration = new XMLConfiguration(_configFile); - configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore"); - configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest"); - - File tmpFile = File.createTempFile("configFile", "test"); - tmpFile.deleteOnExit(); - configuration.save(tmpFile); - - _configFile = tmpFile; - _connectionURL = getConnectionURL(); - - BrokerDetails details = _connectionURL.getBrokerDetails(0); - - // This will attempt to failover for 3 seconds. - // Local testing suggests failover takes 2 seconds - details.setProperty(BrokerDetails.OPTIONS_RETRY, "10"); - details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500"); - - super.setUp(); - } public void test() throws Exception { + if(!isBrokerStorePersistent()) + { + fail("This test requires a broker with a persistent store"); + } - Connection connection = getConnection(_connectionURL); + Connection connection = getConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - Topic topic = (Topic) getInitialContext().lookup("topic"); + Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); TopicSubscriber noLocalSubscriber = session. createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", @@ -99,88 +61,40 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal", null, false); - List<Message> sent = sendMessage(session, topic, SEND_COUNT); - - session.commit(); - - assertEquals("Incorrect number of messages sent", - SEND_COUNT, sent.size()); - + sendMessage(session, topic, SEND_COUNT); // Check messages can be received as expected. connection.start(); - assertTrue("No Local Subscriber is not a no-local subscriber", - noLocalSubscriber.getNoLocal()); - - assertFalse("Normal Subscriber is a no-local subscriber", - normalSubscriber.getNoLocal()); - - List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT); assertEquals("No Local Subscriber Received messages", 0, received.size()); received = receiveMessage(normalSubscriber, SEND_COUNT); assertEquals("Normal Subscriber Received no messages", SEND_COUNT, received.size()); + session.commit(); + connection.close(); - - ((AMQConnection)connection).setConnectionListener(this); - + //We didn't receive the messages on the durable queue for the no-local subscriber + //so they are still on the broker. Restart the broker, prompting their recovery. restartBroker(); + Connection connection2 = getConnection(); + connection2.start(); - //Await - if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS)) - { - fail("Failover Failed to compelete"); - } - - session.rollback(); - - //Failover will restablish our clients - assertTrue("No Local Subscriber is not a no-local subscriber", - noLocalSubscriber.getNoLocal()); - - assertFalse("Normal Subscriber is a no-local subscriber", - normalSubscriber.getNoLocal()); + Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED); + Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); + TopicSubscriber noLocalSubscriber2 = session2. + createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", + null, true); - // NOTE : here that the NO-local subscriber actually now gets ALL the - // messages as the connection has failed and they are consuming on a - // different connnection to the one that was published on. - received = receiveMessage(noLocalSubscriber, SEND_COUNT); + // The NO-local subscriber should now get ALL the messages + // as they are being consumed on a different connection to + // the one that they were published on. + received = receiveMessage(noLocalSubscriber2, SEND_COUNT); + session2.commit(); assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size()); - - received = receiveMessage(normalSubscriber, SEND_COUNT); - assertEquals("Normal Subscriber Received no messages", - SEND_COUNT, received.size()); - - //leave the store in a clean state. - session.commit(); - } - - protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer, - int count) throws JMSException - { - - List<Message> receivedMessages = new ArrayList<Message>(count); - for (int i = 0; i < count; i++) - { - Message received = messageConsumer.receive(1000); - - if (received != null) - { - receivedMessages.add(received); - } - else - { - fail("Only " - + receivedMessages.size() + "/" + count + " received."); - } - } - - return receivedMessages; } protected List<Message> receiveMessage(MessageConsumer messageConsumer, @@ -204,29 +118,4 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn return receivedMessages; } - - public void bytesSent(long count) - { - - } - - public void bytesReceived(long count) - { - - } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _failoverComplete.countDown(); - } } |
