diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:38:52 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:38:52 +0000 |
| commit | 24ee7287390fd303d34f50c820c217ca9bb881bd (patch) | |
| tree | 66b6a43fb66011095f9b775bb09bd11c44a8cbeb /qpid/java | |
| parent | 8fcda77476c6e20c77d7572d87da20ef3c38941d (diff) | |
| download | qpid-python-24ee7287390fd303d34f50c820c217ca9bb881bd.tar.gz | |
QPID-4017: reset the session flow control blocked status during failover.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 96 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index aa5981b81f..68ad794f31 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -285,7 +285,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); + + // reset the flow control flag + // on opening channel, broker sends flow blocked if virtual host is blocked + // if virtual host is not blocked, then broker does not send flow command + // that's why we need to reset the flow control flag + s.setFlowControl(true); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); + s.resubscribe(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index e3d0b8bdbf..62b6c784e2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -20,6 +20,9 @@ package org.apache.qpid.client.failover; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; @@ -44,7 +47,9 @@ import javax.naming.NamingException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -831,6 +836,88 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio } } } + public void testFlowControlFlagResetOnFailover() throws Exception + { + // we do not need the connection failing to second broker + _connection.close(); + + // make sure that failover timeout is bigger than flow control timeout + setTestSystemProperty("qpid.failover_method_timeout", "60000"); + setTestSystemProperty("qpid.flow_control_wait_failure", "10000"); + + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2); + final AtomicInteger counter = new AtomicInteger(); + // try to send 5 messages (should block after 4) + new Thread(new Runnable() + { + @Override + public void run() + { + try + { + MessageProducer producer = producerSession.createProducer(queue); + for (int i=0; i < 5; i++) + { + Message next = createNextMessage(producerSession, i); + producer.send(next); + producerSession.commit(); + counter.incrementAndGet(); + } + } + catch(Exception e) + { + // ignore + } + } + }).start(); + + long limit= 30000l; + long start = System.currentTimeMillis(); + + // wait until session is blocked + while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit) + { + Thread.sleep(100l); + } + + assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + assertEquals("Unexpected number of sent messages", 4, counter.get()); + + killBroker(); + startBroker(); + + // allows the failover thread to proceed + Thread.yield(); + awaitForFailoverCompletion(60000l); + + assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity", capacity); + arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); + ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments); + Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true + + "'&autodelete='" + true + "'"); + ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); + return queue; + } private AMQConnection createConnectionWithFailover() throws NamingException, JMSException { diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 3ad8891061..e34cd6b694 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -57,8 +57,10 @@ org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* // These tests test the behaviour of 0-8..-0-9-1 specific system property qpid.failover_method_timeout org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutExpires org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected +org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOnFailover org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutReconnected +org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover // QPID-3604: Immediate Prefetch no longer supported by 0-10 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener |
