summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java135
1 files changed, 131 insertions, 4 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index 11e345de5e..2274af520b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.logging;
+import junit.framework.AssertionFailedError;
+import org.apache.qpid.client.AMQConnection;
+
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
@@ -37,6 +41,7 @@ import java.util.List;
*
* SUB-1001 : Create : [Durable] [Arguments : <key=value>]
* SUB-1002 : Close
+ * SUB-1003 : State : <state>
*/
public class SubscriptionLoggingTest extends AbstractTestLogging
{
@@ -69,7 +74,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
*
* 1. Running Broker
* 2. Create a new Subscription to a transient queue/topic.
- * Output:
+ * Output: 6
*
* <date> SUB-1001 : Create
*
@@ -170,7 +175,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose"));
// Beacause it is an auto close and we have no messages on the queue we
- // will get a close message
+ // will get a close message
log = getLog(results.get(1));
validateMessageID("SUB-1002", log);
@@ -276,8 +281,6 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
{
_session.createConsumer(_queue).close();
-
-
//Validate
List<String> results = _monitor.findMatches(SUB_PREFIX);
@@ -296,4 +299,128 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
}
+ /**
+ * Description:
+ * When a Subscription fills its prefetch it will become suspended. This
+ * will be logged as a SUB-1003 message.
+ * Input:
+ *
+ * 1. Running broker
+ * 2. Message Producer to put more data on the queue than the client's prefetch
+ * 3. Client that ensures that its prefetch becomes full
+ * Output:
+ *
+ * <date> SUB-1003 : State : <state>
+ *
+ * Validation Steps:
+ * 1. The SUB ID is correct
+ * 2. The state is correct
+ *
+ * @throws java.io.IOException - if there is a problem getting the matches
+ * @throws javax.jms.JMSException - if there is a problem creating the consumer
+ */
+ public void testSubscriptionSuspend() throws Exception, IOException
+ {
+ //Close session with large prefetch
+ _connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+ int PREFETCH = 15;
+
+ //Create new session with small prefetch
+ _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+
+ MessageConsumer consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+
+ //Fill the prefetch and two extra so that our receive bellow allows the
+ // subscription to become active then return to a suspended state.
+ sendMessage(_session, _queue, 17);
+
+ // Retreive the first message, and start the flow of messages
+ assertNotNull("First message not retreived", consumer.receive(1000));
+
+ //Give the internal broker time to respond to the ack that the above
+ // receive will perform.
+ if (!isExternalBroker())
+ {
+ Thread.sleep(1000);
+ }
+
+ _connection.close();
+
+ //Validate
+ List<String> results = _monitor.findMatches("SUB-1003");
+
+ try
+ {
+ // Validation expects three messages.
+ // The first will be logged by the QueueActor as part of the processQueue thread
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The second will be by the connnection as it acknowledges and activates the subscription
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
+ // The final one will be the subscription suspending as part of the SubFlushRunner
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+
+ assertEquals("Result set larger than expected.", 3, results.size());
+
+ // Validate Initial Suspension
+ String expectedState = "SUSPENDED";
+ String log = getLog(results.get(0));
+ validateSubscriptionState(log, expectedState);
+
+ // Validate that the logActor is the the queue
+ String actor = fromActor(log);
+ assertTrue("Actor string does not contain expected queue("
+ + _queue.getQueueName() + ") name." + actor,
+ actor.contains("qu(" + _queue.getQueueName() + ")"));
+
+ // After being suspended the subscription should become active.
+ expectedState = "ACTIVE";
+ log = getLog(results.get(1));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a connection Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
+
+ // Validate that it was re-suspended
+ expectedState = "SUSPENDED";
+ log = getLog(results.get(2));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a subscription Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a subscription actor:" + actor, actor.startsWith("sub:"));
+
+ }
+ catch (AssertionFailedError afe)
+ {
+ System.err.println("Log Dump:");
+ for (String log : results)
+ {
+ System.err.println(log);
+ }
+ throw afe;
+ }
+
+ }
+
+ /**
+ * Validate that the given log statement is a well formatted SUB-1003
+ * message. That means the ID and expected state are correct.
+ *
+ * @param log the log to test
+ * @param expectedState the state that should be logged.
+ */
+ private void validateSubscriptionState(String log, String expectedState)
+ {
+ validateMessageID("SUB-1003", log);
+ String logMessage = getMessageString(fromMessage(log));
+ assertTrue("Log Message does not start with 'State'" + logMessage,
+ logMessage.startsWith("State"));
+
+ assertTrue("Log Message does not have expected State of '"
+ + expectedState + "'" + logMessage,
+ logMessage.endsWith(expectedState));
+ }
+
}