summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java18
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java59
2 files changed, 49 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index bddbc329ab..ee7fc533a3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -22,10 +22,11 @@ package org.apache.qpid.client.util;
import java.util.Iterator;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class FlowControllingBlockingQueue
{
+ private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
+
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final Queue _queue = new ConcurrentLinkedQueue();
@@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+
+ private boolean disableFlowControl;
public boolean isEmpty()
{
@@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
+ if (highThreshold == 0)
+ {
+ disableFlowControl = true;
+ }
}
public Object take() throws InterruptedException
@@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue
}
}
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
@@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue
_listener.underThreshold(_count);
}
}
+
}
return o;
@@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue
notifyAll();
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c5cdb83bbf..2a44413ac8 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber;
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+
+ /** Timeout for receive() if we are expecting a message */
+ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+
+ /** Timeout for receive() if we are not expecting a message */
+ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- _logger.info("Receive message on consumer 1:expecting A");
- msg = consumer2.receive();
+ _logger.info("Receive message on consumer 2:expecting A");
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
@@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Close connection");
@@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- msg = consumer2.receive();
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
consumer2.close();
@@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
- msg = consumer2.receive();
- assertNotNull(msg);
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
// Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Send 1 matching message and 1 non-matching message
sendMatchingAndNonMatchingMessage(session, producer);
- Message rMsg = subA.receive(1000);
+ Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector1",
((TextMessage) rMsg).getText());
- rMsg = subA.receive(1000);
+ rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
// Disconnect subscriber
@@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Check messages are recieved properly
sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
@@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ }
}