From 949fe96882fe0250b69bd220288f22aa6407fee4 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 8 Jul 2008 09:07:37 +0000 Subject: QPID-293 allow messages which have been received by the consumer before a message listener has been set to be delivered. BasicMessageConsumer.java: If there are messages on the synchronous queue when a message listener is set, deliver them to it since they can no longer be consumed(). MessageListenerTest.java: Uncomment code that will make the test fail and demonstrate the bug git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674747 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/BasicMessageConsumer.java | 8 ++++++++ .../src/test/java/org/apache/qpid/client/MessageListenerTest.java | 8 ++++---- 2 files changed, 12 insertions(+), 4 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 7ce81aeea2..41880ee11e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -277,6 +277,14 @@ public abstract class BasicMessageConsumer extends Closeable implements Me _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDispatcherIfNecessary(); + + // If we already have messages on the queue, deliver them to the listener + Object o = _synchronousQueue.poll(); + while (o != null) + { + messageListener.onMessage((Message) o); + o = _synchronousQueue.poll(); + } } } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 12b84b1495..3b7302df62 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -132,16 +132,16 @@ public class MessageListenerTest extends QpidTestCase implements MessageListener } - public void testRecieveTheUseMessageListener() throws Exception + public void testRecieveThenUseMessageListener() throws Exception { _logger.error("Test disabled as initial receive is not called first"); // Perform initial receive to start connection - // assertTrue(_consumer.receive(2000) != null); - // receivedCount++; + assertTrue(_consumer.receive(2000) != null); + receivedCount++; // Sleep to ensure remaining 4 msgs end up on _synchronousQueue - // Thread.sleep(1000); + Thread.sleep(1000); // Set the message listener and wait for the messages to come in. _consumer.setMessageListener(this); -- cgit v1.2.1