diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-24 15:10:23 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-24 15:10:23 +0000 |
| commit | d13157eca83778f61e59c04257a47c04c8932f0e (patch) | |
| tree | 6cdb29552391f7ff845051f2ec4d52f5e69c056b /java/client/src | |
| parent | 039ef144769bdff486273fc088db762297091db1 (diff) | |
| download | qpid-python-d13157eca83778f61e59c04257a47c04c8932f0e.tar.gz | |
updated consumer thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
8 files changed, 118 insertions, 114 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 75ce2c326e..bc60ef94a0 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -87,11 +87,6 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private boolean _isReceiving = false; /** - * Indicates that a nowait is receiving a message. - */ - private boolean _isNoWaitIsReceiving = false; - - /** * Number of mesages received asynchronously * Nether exceed MAX_MESSAGE_TRANSFERRED */ @@ -109,7 +104,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param noLocal If true inhibits the delivery of messages published by its own connection. * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. * If this value is null, a non-durable subscription is created. - * @param consumerTag This consumer ID. + * @param consumerTag Thi actor ID * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, @@ -362,34 +357,18 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer throw new javax.jms.IllegalStateException("A listener has already been set."); } - if (_incomingMessage != null) - { - System.out.println("We already had a message in the queue"); - result = (Message) _incomingMessage; - _incomingMessage = null; - return result; - } - synchronized (_incomingMessageLock) { // This indicate to the delivery thread to deliver the message to this consumer // as it can happens that a message is delivered after a receive operation as returned. _isReceiving = true; + boolean blockingReceived = timeout == 0; if (!_isStopped) { // if this consumer is stopped then this will be call when starting - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, - 1); - getSession().getQpidSession().messageFlush(getMessageActorID()); - _messageReceived.set(false); - System.out.println("no message in the queue, issuing a flow(1) and waiting for message"); - + requestOneMessage(); //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); - - System.out.println("we got returned from sync()"); - //received = getSession().getQpidSession().messagesReceived(); } if (_messageReceived.get() && timeout < 0) { @@ -398,42 +377,81 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } else { - // right we need to let onMessage know that a nowait is potentially waiting for a message - if (timeout < 0) - { - _isNoWaitIsReceiving = true; - } - while (_incomingMessage == null && !_isClosed) + boolean messageReceived = false; + while (!messageReceived) { - try + long timeBeforeWait = 0; + while (_incomingMessage == null && !_isClosed) { - System.out.println("waiting for message"); - _incomingMessageLock.wait(timeout); + if (!blockingReceived) + { + timeBeforeWait = System.currentTimeMillis(); + } + try + { + _incomingMessageLock.wait(timeout); + } + catch (InterruptedException e) + { + // do nothing + } } - catch (InterruptedException e) + if (_incomingMessage != null) { - // do nothing + result = (Message) _incomingMessage; + // tell the session that a message is inprocess + getSession().preProcessMessage(_incomingMessage); + // tell the session to acknowledge this message (if required) + getSession().acknowledgeMessage(_incomingMessage); + _incomingMessage.afterMessageReceive(); + messageReceived = true; + } + else + { + //now setup the new timeout + if (!blockingReceived) + { + timeout = timeout - (System.currentTimeMillis() - timeBeforeWait); + } + if (!_isClosed) + { + // we need to request a new message + requestOneMessage(); + getSession().getQpidSession().sync(); + if (_messageReceived.get() && timeout < 0) + { + // we are waiting for too long and we haven't received a proper message + result = null; + messageReceived = true; + } + } } - } - if (_incomingMessage != null) - { - result = (Message) _incomingMessage; - // tell the session that a message is inprocess - getSession().preProcessMessage(_incomingMessage); - // tell the session to acknowledge this message (if required) - getSession().acknowledgeMessage(_incomingMessage); } _incomingMessage = null; } // We now release any message received for this consumer _isReceiving = false; - _isNoWaitIsReceiving = false; getSession().testQpidException(); } return result; } /** + * Request a single message + */ + private void requestOneMessage() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Requesting a single message"); + } + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + getSession().getQpidSession().messageFlush(getMessageActorID()); + _messageReceived.set(false); + } + + /** * Stop the delivery of messages to this consumer. * <p>For asynchronous receiver, this operation blocks until the message listener * finishes processing the current message, @@ -500,24 +518,22 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // notify the waiting thread if (_messageListener == null) { - System.out.println("Received a message- onMessage in message consumer Impl"); - + if (_logger.isDebugEnabled()) + { + _logger.debug("Received a message- onMessage in message consumer Impl"); + } synchronized (_incomingMessageLock) { if (messageOk) { - System.out.println("Received a message- onMessage in message ok " + messageOk); // we have received a proper message that we can deliver if (_isReceiving) { - System.out.println("Received a message- onMessage in message _isReceiving"); - _incomingMessage = message; _incomingMessageLock.notify(); } else { - System.out.println("Received a message- onMessage in message releasing"); // this message has been received after a received as returned // we need to release it releaseMessage(message); @@ -530,21 +546,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // then we need to request a new one from the server if (_isReceiving) { - getSession().getQpidSession() - .messageFlow(getMessageActorID(), - org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - getSession().getQpidSession().messageFlush(getMessageActorID()); - _messageReceived.set(false); - - // When sync() returns we know whether we have received a message or not. - getSession().getQpidSession().sync(); - - if (_messageReceived.get() && _isNoWaitIsReceiving) - { - // Right a message nowait is waiting for a message - // but no one can be delivered it then need to return - _incomingMessageLock.notify(); - } + _incomingMessageLock.notify(); } } } @@ -582,6 +584,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer **/ try { + message.afterMessageReceive(); _messageListener.onMessage((Message) message); } catch (RuntimeException re) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index 57f5da241a..4dbf86a388 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -17,13 +17,10 @@ */ package org.apache.qpidity.jms; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - +import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.jms.message.MessageFactory; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.util.MessageListener; -import org.apache.qpidity.jms.message.MessageFactory; -import org.apache.qpidity.jms.message.QpidMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +29,8 @@ import org.slf4j.LoggerFactory; * This is for guarantying that asynch messages are sequentially processed within their session. * <p> when used synchonously, messages are dispatched to the receiver itself. */ -public class QpidMessageListener implements MessageListener, Runnable +public class QpidMessageListener implements MessageListener { - - // temp solution - LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>(); - /** * Used for debugging. */ @@ -57,35 +50,32 @@ public class QpidMessageListener implements MessageListener, Runnable public QpidMessageListener(MessageConsumerImpl consumer) { _consumer = consumer; - Thread t = new Thread(this); - t.start(); } - - public void run() + + //---- org.apache.qpidity.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) { try { - while(true) + // to be used with flush + _consumer.notifyMessageReceived(); + + //convert this message into a JMS one + QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); + // if consumer is asynchronous then send this message to its session. + if( _consumer.getMessageListener() != null ) { - System.out.println("trying to take a message message"); - Message message = _queue.take(); - - // to be used with flush - System.out.println("processing the message"); - _consumer.notifyMessageReceived(); - - //convert this message into a JMS one - QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); - // if consumer is asynchronous then send this message to its session. - if( _consumer.getMessageListener() != null ) - { - _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); - } - else - { - // deliver this message to the consumer itself - _consumer.onMessage(jmsMessage); - } + _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); + } + else + { + // deliver this message to the consumer itself + _consumer.onMessage(jmsMessage); } } catch (Exception e) @@ -93,17 +83,4 @@ public class QpidMessageListener implements MessageListener, Runnable throw new RuntimeException(e.getMessage()); } } - - //---- org.apache.qpidity.MessagePartListener API - /** - * Deliver a message to the listener. - * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message) - { - System.out.println("Received a message"); - _queue.offer(message); - System.out.println("Added queue to the message"); - } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index ecf6c796d3..ce6b14fac6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -827,13 +827,22 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage * * @throws QpidException */ + @Override public void afterMessageReceive() throws QpidException { super.afterMessageReceive(); ByteBuffer messageData = getMessageData(); if (messageData != null) { - _dataIn = new DataInputStream(new ByteArrayInputStream(messageData.array())); + try + { + _dataIn = new DataInputStream( + new ByteArrayInputStream(messageData.array(), messageData.arrayOffset(), messageData.limit())); + } + catch (Exception e) + { + throw new QpidException("Cannot retrieve data from message ", null, e); + } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java index 11bcec28ea..7e7429f8ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java @@ -585,6 +585,7 @@ public class MapMessageImpl extends MessageImpl implements MapMessage /** * This method is invoked after this message has been received. */ + @Override public void afterMessageReceive() throws QpidException { super.afterMessageReceive(); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index c6d707e563..e1e57d9441 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -887,6 +887,7 @@ public class MessageImpl extends QpidMessage implements Message * * @throws QpidException If there is an internal error when procesing this message. */ + @Override public void afterMessageReceive() throws QpidException { // recreate a destination object for the encoded destination diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java index 5878fc8c34..a981fc2846 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java @@ -149,6 +149,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage * * @throws QpidException */ + @Override public void afterMessageReceive() throws QpidException { super.afterMessageReceive(); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 1916051c4c..c11a7d8c3b 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -395,7 +395,7 @@ public class QpidMessage try { // set the message data - _qpidityMessage.clearData(); + _qpidityMessage.clearData(); if (_logger.isDebugEnabled()) { _logger.debug("_messageData POS " + _messageData.position()); @@ -429,6 +429,17 @@ public class QpidMessage { return _qpidityMessage.getMessageTransferId(); } + + /** + * This method is invoked after this message is received. + * + * @throws QpidException If there is an internal error when procesing this message. + */ + public void afterMessageReceive() throws QpidException + { + // do nothing for now + } + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java index e894c33682..c1afc06502 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java @@ -110,6 +110,7 @@ public class TextMessageImpl extends MessageImpl implements TextMessage /** * This method is invoked after this message has been received. */ + @Override public void afterMessageReceive() throws QpidException { super.afterMessageReceive(); |
