summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-19 15:08:04 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-19 15:08:04 +0000
commitf97c8a999f3b62fa8212a8fd102ea31662bc00d3 (patch)
tree86acb965b4c6c3284e82d3d797229365093bbd16 /java/client/src
parent281b4220b2c53c430542a152f1d925c4cc077305 (diff)
downloadqpid-python-f97c8a999f3b62fa8212a8fd102ea31662bc00d3.tar.gz
QPID-455 Pre-fetched messages can cause problems with client tools. Set IMMEDIATE_PREFETCH="true" for previous behaviour.
Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause existing messages to be immediately pre-fetched to the newly registered consumer. Solved out standing broker issues see QPID-458 and QPID-459. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530442 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java46
1 files changed, 29 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8bb5b622f7..8325fa4dd1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -199,11 +199,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final Object _suspensionLock = new Object();
- /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+ /** System property to enable immediate message prefetching */
+ private static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+ /** Immediate message prefetch default */
+ private static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
- private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -1932,20 +1939,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
-// if (!connectionStopped)
+ // We do this now if this is the first call on a started connection
+ if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
{
- if (isSuspended() && _firstDispatcher.getAndSet(false))
+ try
{
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
}
}
}
@@ -1998,11 +2004,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- // The dispatcher will be null if we have just created this session
- // so suspend the channel before we register our consumer so that we don't
- // start prefetching until a receive/mListener is set.
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
if (_dispatcher == null)
{
if (!isSuspended())
@@ -2010,6 +2017,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
suspendChannel(true);
+ _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
{
@@ -2018,6 +2026,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
}
+ else
+ {
+ _logger.info("Immediately prefetching existing messages to new consumer.");
+ }
try
{