summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-03-19 14:00:37 +0000
committerKeith Wall <kwall@apache.org>2012-03-19 14:00:37 +0000
commite9a23033bb075f50b0a46c9366012e30538a4e54 (patch)
treeca952b4b7109cea8ff7ca7896f708c525ec3410a /qpid/java/systests
parent617810d8db1359d81688be587908cebe77d3f559 (diff)
downloadqpid-python-e9a23033bb075f50b0a46c9366012e30538a4e54.tar.gz
QPID-3895: Remove blocked channel/session from the list of blocked channels on channel/session close
This patch adds the fllowing: - fixes AMQChannel to stop sending flow commands if channel is closing - fixes AMQChannel#compareTo ServerSession#compareTo - removes AMQSessionModel#getID() method from AMQChannel and Server session in order to avoid confusions Applied patch from Oleksandr Rudyy <orudyy@gmail.com>. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1302455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java36
1 files changed, 34 insertions, 2 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index ad8c856a74..13053d02df 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -357,13 +357,45 @@ public class ProducerFlowControlTest extends AbstractTestLogging
consumer.receive();
}
+ public void testQueueDeleteWithBlockedFlow() throws Exception
+ {
+ String queueName = getTestQueueName();
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false);
+
+ producer = producerSession.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ // close blocked producer session and connection
+ producerConnection.close();
+
+ // delete queue with a consumer session
+ ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName));
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(1000l);
+ assertNull("Unexpected message", message);
+ }
+
private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
{
+ createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true);
+ }
+
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception
+ {
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",capacity);
arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
- ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments);
+ queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'");
((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
}