summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-24 15:10:23 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-24 15:10:23 +0000
commitd13157eca83778f61e59c04257a47c04c8932f0e (patch)
tree6cdb29552391f7ff845051f2ec4d52f5e69c056b /java/client/src
parent039ef144769bdff486273fc088db762297091db1 (diff)
downloadqpid-python-d13157eca83778f61e59c04257a47c04c8932f0e.tar.gz
updated consumer thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java133
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java71
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java13
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java1
8 files changed, 118 insertions, 114 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 75ce2c326e..bc60ef94a0 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -87,11 +87,6 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
private boolean _isReceiving = false;
/**
- * Indicates that a nowait is receiving a message.
- */
- private boolean _isNoWaitIsReceiving = false;
-
- /**
* Number of mesages received asynchronously
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
@@ -109,7 +104,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param noLocal If true inhibits the delivery of messages published by its own connection.
* @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
* If this value is null, a non-durable subscription is created.
- * @param consumerTag This consumer ID.
+ * @param consumerTag Thi actor ID
* @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
@@ -362,34 +357,18 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
- if (_incomingMessage != null)
- {
- System.out.println("We already had a message in the queue");
- result = (Message) _incomingMessage;
- _incomingMessage = null;
- return result;
- }
-
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to this consumer
// as it can happens that a message is delivered after a receive operation as returned.
_isReceiving = true;
+ boolean blockingReceived = timeout == 0;
if (!_isStopped)
{
// if this consumer is stopped then this will be call when starting
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
- System.out.println("no message in the queue, issuing a flow(1) and waiting for message");
-
+ requestOneMessage();
//When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
-
- System.out.println("we got returned from sync()");
- //received = getSession().getQpidSession().messagesReceived();
}
if (_messageReceived.get() && timeout < 0)
{
@@ -398,42 +377,81 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
else
{
- // right we need to let onMessage know that a nowait is potentially waiting for a message
- if (timeout < 0)
- {
- _isNoWaitIsReceiving = true;
- }
- while (_incomingMessage == null && !_isClosed)
+ boolean messageReceived = false;
+ while (!messageReceived)
{
- try
+ long timeBeforeWait = 0;
+ while (_incomingMessage == null && !_isClosed)
{
- System.out.println("waiting for message");
- _incomingMessageLock.wait(timeout);
+ if (!blockingReceived)
+ {
+ timeBeforeWait = System.currentTimeMillis();
+ }
+ try
+ {
+ _incomingMessageLock.wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
}
- catch (InterruptedException e)
+ if (_incomingMessage != null)
{
- // do nothing
+ result = (Message) _incomingMessage;
+ // tell the session that a message is inprocess
+ getSession().preProcessMessage(_incomingMessage);
+ // tell the session to acknowledge this message (if required)
+ getSession().acknowledgeMessage(_incomingMessage);
+ _incomingMessage.afterMessageReceive();
+ messageReceived = true;
+ }
+ else
+ {
+ //now setup the new timeout
+ if (!blockingReceived)
+ {
+ timeout = timeout - (System.currentTimeMillis() - timeBeforeWait);
+ }
+ if (!_isClosed)
+ {
+ // we need to request a new message
+ requestOneMessage();
+ getSession().getQpidSession().sync();
+ if (_messageReceived.get() && timeout < 0)
+ {
+ // we are waiting for too long and we haven't received a proper message
+ result = null;
+ messageReceived = true;
+ }
+ }
}
- }
- if (_incomingMessage != null)
- {
- result = (Message) _incomingMessage;
- // tell the session that a message is inprocess
- getSession().preProcessMessage(_incomingMessage);
- // tell the session to acknowledge this message (if required)
- getSession().acknowledgeMessage(_incomingMessage);
}
_incomingMessage = null;
}
// We now release any message received for this consumer
_isReceiving = false;
- _isNoWaitIsReceiving = false;
getSession().testQpidException();
}
return result;
}
/**
+ * Request a single message
+ */
+ private void requestOneMessage()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Requesting a single message");
+ }
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ _messageReceived.set(false);
+ }
+
+ /**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message listener
* finishes processing the current message,
@@ -500,24 +518,22 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// notify the waiting thread
if (_messageListener == null)
{
- System.out.println("Received a message- onMessage in message consumer Impl");
-
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Received a message- onMessage in message consumer Impl");
+ }
synchronized (_incomingMessageLock)
{
if (messageOk)
{
- System.out.println("Received a message- onMessage in message ok " + messageOk);
// we have received a proper message that we can deliver
if (_isReceiving)
{
- System.out.println("Received a message- onMessage in message _isReceiving");
-
_incomingMessage = message;
_incomingMessageLock.notify();
}
else
{
- System.out.println("Received a message- onMessage in message releasing");
// this message has been received after a received as returned
// we need to release it
releaseMessage(message);
@@ -530,21 +546,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// then we need to request a new one from the server
if (_isReceiving)
{
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
- org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
-
- // When sync() returns we know whether we have received a message or not.
- getSession().getQpidSession().sync();
-
- if (_messageReceived.get() && _isNoWaitIsReceiving)
- {
- // Right a message nowait is waiting for a message
- // but no one can be delivered it then need to return
- _incomingMessageLock.notify();
- }
+ _incomingMessageLock.notify();
}
}
}
@@ -582,6 +584,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
**/
try
{
+ message.afterMessageReceive();
_messageListener.onMessage((Message) message);
}
catch (RuntimeException re)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
index 57f5da241a..4dbf86a388 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
@@ -17,13 +17,10 @@
*/
package org.apache.qpidity.jms;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.util.MessageListener;
-import org.apache.qpidity.jms.message.MessageFactory;
-import org.apache.qpidity.jms.message.QpidMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,12 +29,8 @@ import org.slf4j.LoggerFactory;
* This is for guarantying that asynch messages are sequentially processed within their session.
* <p> when used synchonously, messages are dispatched to the receiver itself.
*/
-public class QpidMessageListener implements MessageListener, Runnable
+public class QpidMessageListener implements MessageListener
{
-
- // temp solution
- LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
-
/**
* Used for debugging.
*/
@@ -57,35 +50,32 @@ public class QpidMessageListener implements MessageListener, Runnable
public QpidMessageListener(MessageConsumerImpl consumer)
{
_consumer = consumer;
- Thread t = new Thread(this);
- t.start();
}
-
- public void run()
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
{
try
{
- while(true)
+ // to be used with flush
+ _consumer.notifyMessageReceived();
+
+ //convert this message into a JMS one
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ // if consumer is asynchronous then send this message to its session.
+ if( _consumer.getMessageListener() != null )
{
- System.out.println("trying to take a message message");
- Message message = _queue.take();
-
- // to be used with flush
- System.out.println("processing the message");
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its session.
- if( _consumer.getMessageListener() != null )
- {
- _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
- }
+ _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
}
}
catch (Exception e)
@@ -93,17 +83,4 @@ public class QpidMessageListener implements MessageListener, Runnable
throw new RuntimeException(e.getMessage());
}
}
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- System.out.println("Received a message");
- _queue.offer(message);
- System.out.println("Added queue to the message");
- }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
index ecf6c796d3..ce6b14fac6 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
@@ -827,13 +827,22 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
*
* @throws QpidException
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
ByteBuffer messageData = getMessageData();
if (messageData != null)
{
- _dataIn = new DataInputStream(new ByteArrayInputStream(messageData.array()));
+ try
+ {
+ _dataIn = new DataInputStream(
+ new ByteArrayInputStream(messageData.array(), messageData.arrayOffset(), messageData.limit()));
+ }
+ catch (Exception e)
+ {
+ throw new QpidException("Cannot retrieve data from message ", null, e);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
index 11bcec28ea..7e7429f8ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
@@ -585,6 +585,7 @@ public class MapMessageImpl extends MessageImpl implements MapMessage
/**
* This method is invoked after this message has been received.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
index c6d707e563..e1e57d9441 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
@@ -887,6 +887,7 @@ public class MessageImpl extends QpidMessage implements Message
*
* @throws QpidException If there is an internal error when procesing this message.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
// recreate a destination object for the encoded destination
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
index 5878fc8c34..a981fc2846 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
@@ -149,6 +149,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
*
* @throws QpidException
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 1916051c4c..c11a7d8c3b 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -395,7 +395,7 @@ public class QpidMessage
try
{
// set the message data
- _qpidityMessage.clearData();
+ _qpidityMessage.clearData();
if (_logger.isDebugEnabled())
{
_logger.debug("_messageData POS " + _messageData.position());
@@ -429,6 +429,17 @@ public class QpidMessage
{
return _qpidityMessage.getMessageTransferId();
}
+
+ /**
+ * This method is invoked after this message is received.
+ *
+ * @throws QpidException If there is an internal error when procesing this message.
+ */
+ public void afterMessageReceive() throws QpidException
+ {
+ // do nothing for now
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
index e894c33682..c1afc06502 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
@@ -110,6 +110,7 @@ public class TextMessageImpl extends MessageImpl implements TextMessage
/**
* This method is invoked after this message has been received.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();