diff options
| author | Keith Wall <kwall@apache.org> | 2012-03-19 14:00:37 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-03-19 14:00:37 +0000 |
| commit | e9a23033bb075f50b0a46c9366012e30538a4e54 (patch) | |
| tree | ca952b4b7109cea8ff7ca7896f708c525ec3410a /qpid/java/systests | |
| parent | 617810d8db1359d81688be587908cebe77d3f559 (diff) | |
| download | qpid-python-e9a23033bb075f50b0a46c9366012e30538a4e54.tar.gz | |
QPID-3895: Remove blocked channel/session from the list of blocked channels on channel/session close
This patch adds the fllowing:
- fixes AMQChannel to stop sending flow commands if channel is closing
- fixes AMQChannel#compareTo ServerSession#compareTo
- removes AMQSessionModel#getID() method from AMQChannel and Server session in order to avoid confusions
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1302455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java | 36 |
1 files changed, 34 insertions, 2 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index ad8c856a74..13053d02df 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -357,13 +357,45 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumer.receive(); } + public void testQueueDeleteWithBlockedFlow() throws Exception + { + String queueName = getTestQueueName(); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false); + + producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + // close blocked producer session and connection + producerConnection.close(); + + // delete queue with a consumer session + ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName)); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(1000l); + assertNull("Unexpected message", message); + } + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception { + createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true); + } + + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception + { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",capacity); arguments.put("x-qpid-flow-resume-capacity",resumeCapacity); - ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments); + queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'"); ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); } |
