diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 15:08:04 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 15:08:04 +0000 |
| commit | f97c8a999f3b62fa8212a8fd102ea31662bc00d3 (patch) | |
| tree | 86acb965b4c6c3284e82d3d797229365093bbd16 /java/client/src | |
| parent | 281b4220b2c53c430542a152f1d925c4cc077305 (diff) | |
| download | qpid-python-f97c8a999f3b62fa8212a8fd102ea31662bc00d3.tar.gz | |
QPID-455 Pre-fetched messages can cause problems with client tools. Set IMMEDIATE_PREFETCH="true" for previous behaviour.
Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause existing messages to be immediately pre-fetched to the newly registered consumer.
Solved out standing broker issues see QPID-458 and QPID-459.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530442 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8bb5b622f7..8325fa4dd1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -199,11 +199,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ + private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + + /** System property to enable immediate message prefetching */ + private static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + /** Immediate message prefetch default */ + private static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); - private AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { @@ -1932,20 +1939,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + // If IMMEDIATE_PREFETCH is not set then we need to start fetching + if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) { -// if (!connectionStopped) + // We do this now if this is the first call on a started connection + if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) { - if (isSuspended() && _firstDispatcher.getAndSet(false)) + try { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } + suspendChannel(false); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); } } } @@ -1998,11 +2004,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - // The dispatcher will be null if we have just created this session - // so suspend the channel before we register our consumer so that we don't - // start prefetching until a receive/mListener is set. - if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch + if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) { + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. if (_dispatcher == null) { if (!isSuspended()) @@ -2010,6 +2017,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { suspendChannel(true); + _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { @@ -2018,6 +2026,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } } + else + { + _logger.info("Immediately prefetching existing messages to new consumer."); + } try { |
