summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-29 11:38:52 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-29 11:38:52 +0000
commit24ee7287390fd303d34f50c820c217ca9bb881bd (patch)
tree66b6a43fb66011095f9b775bb09bd11c44a8cbeb /qpid/java
parent8fcda77476c6e20c77d7572d87da20ef3c38941d (diff)
downloadqpid-python-24ee7287390fd303d34f50c820c217ca9bb881bd.tar.gz
QPID-4017: reset the session flow control blocked status during failover.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java87
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes2
3 files changed, 96 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index aa5981b81f..68ad794f31 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -285,7 +285,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
+
+ // reset the flow control flag
+ // on opening channel, broker sends flow blocked if virtual host is blocked
+ // if virtual host is not blocked, then broker does not send flow command
+ // that's why we need to reset the flow control flag
+ s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
+
s.resubscribe();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index e3d0b8bdbf..62b6c784e2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -20,6 +20,9 @@ package org.apache.qpid.client.failover;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -44,7 +47,9 @@ import javax.naming.NamingException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -831,6 +836,88 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
}
}
}
+ public void testFlowControlFlagResetOnFailover() throws Exception
+ {
+ // we do not need the connection failing to second broker
+ _connection.close();
+
+ // make sure that failover timeout is bigger than flow control timeout
+ setTestSystemProperty("qpid.failover_method_timeout", "60000");
+ setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
+
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
+ final AtomicInteger counter = new AtomicInteger();
+ // try to send 5 messages (should block after 4)
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageProducer producer = producerSession.createProducer(queue);
+ for (int i=0; i < 5; i++)
+ {
+ Message next = createNextMessage(producerSession, i);
+ producer.send(next);
+ producerSession.commit();
+ counter.incrementAndGet();
+ }
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ }
+ }).start();
+
+ long limit= 30000l;
+ long start = System.currentTimeMillis();
+
+ // wait until session is blocked
+ while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ assertEquals("Unexpected number of sent messages", 4, counter.get());
+
+ killBroker();
+ startBroker();
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(60000l);
+
+ assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments);
+ Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+ + "'&autodelete='" + true + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
private AMQConnection createConnectionWithFailover() throws NamingException, JMSException
{
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index 3ad8891061..e34cd6b694 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -57,8 +57,10 @@ org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
// These tests test the behaviour of 0-8..-0-9-1 specific system property qpid.failover_method_timeout
org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutExpires
org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
+org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOnFailover
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
+org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover
// QPID-3604: Immediate Prefetch no longer supported by 0-10
org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener