diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/pom.xml | 6 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java | 147 | 
2 files changed, 153 insertions, 0 deletions
| diff --git a/java/client/pom.xml b/java/client/pom.xml index 0d6b3420ee..2e720cdb71 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -106,6 +106,12 @@              <scope>test</scope>          </dependency> +        <dependency> +            <groupId>uk.co.thebadgerset</groupId> +            <artifactId>junit-toolkit</artifactId> +            <scope>test</scope> +        </dependency> +          <!-- These need to be included at compile time only, for the retrotranslator verification to find them. -->          <dependency>              <groupId>net.sf.retrotranslator</groupId> diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java new file mode 100644 index 0000000000..5cee306846 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -0,0 +1,147 @@ +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +package org.apache.qpid.test.unit.close;
 +
 +import javax.jms.Connection;
 +import javax.jms.Message;
 +import javax.jms.MessageListener;
 +import javax.jms.Session;
 +
 +import junit.framework.Assert;
 +import junit.framework.TestCase;
 +
 +import org.apache.log4j.Logger;
 +
 +import org.apache.qpid.client.AMQConnection;
 +import org.apache.qpid.client.transport.TransportConnection;
 +
 +import uk.co.thebadgerset.junit.concurrency.TestRunnable;
 +import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
 +
 +/**
 + * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
 + * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
 + * before closing the connection.
 + *
 + * <p><table id="crc"><caption>CRC Card</caption>
 + * <tr><th> Responsibilities <th> Collaborations
 + * <tr><td> Check that closing a connection whilst handling a message, blocks till completion of the handler.
 + * </table>
 + */
 +public class CloseBeforeAckTest extends TestCase
 +{
 +    private static final Logger log = Logger.getLogger(CloseBeforeAckTest.class);
 +
 +    Connection connection;
 +    Session session;
 +    public static final String TEST_QUEUE_NAME = "TestQueue";
 +
 +    class TestThread1 extends TestRunnable implements MessageListener
 +    {
 +        public void runWithExceptions() throws Exception
 +        {
 +            // Set this up to listen for message on the test session.
 +            session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
 +        }
 +
 +        public void onMessage(Message message)
 +        {
 +            // Give thread 2 permission to close the session.
 +            allow(new int[] { 1 });
 +
 +            // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
 +            waitFor(new int[] { 1 }, true);
 +        }
 +    }
 +
 +    TestThread1 testThread1 = new TestThread1();
 +
 +    TestRunnable testThread2 =
 +        new TestRunnable()
 +        {
 +            public void runWithExceptions() throws Exception
 +            {
 +                // Send a message to be picked up by thread 1.
 +                session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
 +                                                  session.createTextMessage("Hi there thread 1!"));
 +
 +                // Wait for thread 1 to pick up the message and give permission to continue.
 +                waitFor(new int[] { 0 }, false);
 +
 +                // Close the connection.
 +                session.close();
 +
 +                // Allow thread 1 to continue to completion, if it is erronously still waiting.
 +                allow(new int[] { 1 });
 +            }
 +        };
 +
 +    public void testCloseBeforeAutoAck_QPID_397() throws Exception
 +    {
 +        // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks
 +        // message at the end of the onMessage method, after a close has been sent.
 +        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 +
 +        ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
 +
 +        tt.addTestThread(testThread1, 0);
 +        tt.addTestThread(testThread2, 1);
 +        tt.setDeadlockTimeout(500);
 +        tt.run();
 +
 +        String errorMessage = tt.joinAndRetrieveMessages();
 +
 +        // Print any error messages or exceptions.
 +        log.debug(errorMessage);
 +
 +        if (!tt.getExceptions().isEmpty())
 +        {
 +            for (Exception e : tt.getExceptions())
 +            {
 +                log.debug("Exception thrown during test thread: ", e);
 +            }
 +        }
 +
 +        Assert.assertTrue(errorMessage, "".equals(errorMessage));
 +    }
 +
 +    public void testCloseBeforeAutoAckManyTimes() throws Exception
 +    {
 +        for (int i = 0; i < 500; i++)
 +        {
 +            testCloseBeforeAutoAck_QPID_397();
 +        }
 +    }
 +
 +    protected void setUp() throws Exception
 +    {
 +        super.setUp();
 +        TransportConnection.createVMBroker(1);
 +
 +        connection = new AMQConnection("vm://:1", "guest", "guest", getName(), "test");
 +    }
 +
 +    protected void tearDown() throws Exception
 +    {
 +        super.tearDown();
 +        TransportConnection.killVMBroker(1);
 +    }
 +}
 | 
