diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-24 13:22:02 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-24 13:22:02 +0000 |
| commit | 34033a7d3b6014e2a06077b14c257f1f58732f67 (patch) | |
| tree | 4132c38c0e59015ba9818908dab5cd47b3a7080d /java/client/src | |
| parent | b233694a747df9791f6c3bc62b3007248753397c (diff) | |
| download | qpid-python-34033a7d3b6014e2a06077b14c257f1f58732f67.tar.gz | |
Changed to handle sync receive when connection is started
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@587889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
3 files changed, 45 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a3e2a8fa3a..223ba7b510 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -556,7 +556,10 @@ public class AMQSession_0_10 extends AMQSession { super.suspendChannel(false); - + for(BasicMessageConsumer c: _consumers.values()) + { + c.start(); + } // If the event dispatcher is not running then start it too. if (hasMessageListeners()) { @@ -564,6 +567,15 @@ public class AMQSession_0_10 extends AMQSession } } + void stop() throws AMQException + { + super.stop(); + for(BasicMessageConsumer c: _consumers.values()) + { + c.stop(); + } + } + synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 8cfc5402bb..dc02489731 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -958,4 +958,16 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me { _synchronousQueue.clear(); } + + + public void start() + { + // do nothing as this is a 0_10 feature + } + + + public void stop() + { + // do nothing as this is a 0_10 feature + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index fbfc9d80ec..8958056a7a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -79,6 +79,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ private boolean _preAcquire = true; + /** + * Indicate whether this consumer is started. + */ + private boolean _isStarted = false; + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -105,6 +110,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _preAcquire = false; } } + _isStarted = connection.started(); } // ----- Interface org.apache.qpidity.client.util.MessageListener @@ -449,6 +455,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public Object getMessageFromQueue(long l) throws InterruptedException { + if( !_isStarted ) + { + return null; + } Object o; _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); @@ -497,4 +507,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { } + + public void start() + { + _isStarted = true; + } + + public void stop() + { + _isStarted = false; + } }
\ No newline at end of file |
