diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-24 08:45:43 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-24 08:45:43 +0000 |
| commit | 039ef144769bdff486273fc088db762297091db1 (patch) | |
| tree | 94ff2f4cf94eb678228828294f3a15013ccc8eb9 /java/client | |
| parent | 79882b848cdcec28b43f383c20651f7fe95a94c6 (diff) | |
| download | qpid-python-039ef144769bdff486273fc088db762297091db1.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569298 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
5 files changed, 41 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index c2e8b4a482..75ce2c326e 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -96,7 +96,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Nether exceed MAX_MESSAGE_TRANSFERRED */ private int _messageAsyncrhonouslyReceived = 0; - + private AtomicBoolean _messageReceived = new AtomicBoolean(); //----- Constructors @@ -109,12 +109,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param noLocal If true inhibits the delivery of messages published by its own connection. * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. * If this value is null, a non-durable subscription is created. + * @param consumerTag This consumer ID. * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, - boolean noLocal, String subscriptionName,String consumerTag) throws Exception + boolean noLocal, String subscriptionName, String consumerTag) throws Exception { - super(session, destination,consumerTag); + super(session, destination, consumerTag); if (messageSelector != null) { _messageSelector = messageSelector; @@ -183,13 +184,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT); - + // this will prevent the broker from sending more than one message // When a messageListener is set the flow will be adjusted. // until then we assume it's for synchronous message consumption getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); - + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + getSession().getQpidSession().sync(); // check for an exception if (getSession().getCurrentException() != null) @@ -355,12 +356,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { checkNotClosed(); Message result = null; - + if (_messageListener != null) { throw new javax.jms.IllegalStateException("A listener has already been set."); } - + if (_incomingMessage != null) { System.out.println("We already had a message in the queue"); @@ -368,7 +369,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer _incomingMessage = null; return result; } - + synchronized (_incomingMessageLock) { // This indicate to the delivery thread to deliver the message to this consumer @@ -383,13 +384,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().getQpidSession().messageFlush(getMessageActorID()); _messageReceived.set(false); System.out.println("no message in the queue, issuing a flow(1) and waiting for message"); - + //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); - - System.out.println("we got returned from sync()"); + + System.out.println("we got returned from sync()"); //received = getSession().getQpidSession().messagesReceived(); - } + } if (_messageReceived.get() && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return @@ -406,7 +407,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { try { - System.out.println("waiting for message"); + System.out.println("waiting for message"); _incomingMessageLock.wait(timeout); } catch (InterruptedException e) @@ -427,7 +428,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // We now release any message received for this consumer _isReceiving = false; _isNoWaitIsReceiving = false; - getSession().testQpidException(); + getSession().testQpidException(); } return result; } @@ -500,7 +501,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (_messageListener == null) { System.out.println("Received a message- onMessage in message consumer Impl"); - + synchronized (_incomingMessageLock) { if (messageOk) @@ -510,7 +511,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (_isReceiving) { System.out.println("Received a message- onMessage in message _isReceiving"); - + _incomingMessage = message; _incomingMessageLock.notify(); } @@ -534,11 +535,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); _messageReceived.set(false); - + // When sync() returns we know whether we have received a message or not. - getSession().getQpidSession().sync(); - - if (_messageReceived.get() && _isNoWaitIsReceiving) + getSession().getQpidSession().sync(); + + if (_messageReceived.get() && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message // but no one can be delivered it then need to return @@ -619,7 +620,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer RangeSet ranges = new RangeSet(); ranges.add(message.getMessageTransferId()); getSession().getQpidSession().messageRelease(ranges); - getSession().testQpidException(); + getSession().testQpidException(); } } @@ -667,7 +668,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().testQpidException(); } } - + public void notifyMessageReceived() { _messageReceived.set(true); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index 68dafc7345..ecf6c796d3 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -70,17 +70,6 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); - try - { - ByteBuffer b = message.readData(); - byte[] a = new byte[b.limit()]; - b.get(a); - _dataIn = new DataInputStream(new ByteArrayInputStream(a)); - } - catch(Exception e) - { - e.printStackTrace(); - } } //--- BytesMessage API diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java index 9c4387d8b0..11bcec28ea 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java @@ -562,8 +562,8 @@ public class MapMessageImpl extends MessageImpl implements MapMessage //-- Overwritten methods /** * This method is invoked before this message is dispatched. - * <p>This class uses it to convert its text payload into a ByteBuffer */ + @Override public void beforeMessageDispatch() throws QpidException { try diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index 1bd93d792d..c6d707e563 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -83,7 +83,7 @@ public class MessageImpl extends QpidMessage implements Message { super(message); } - + //---- javax.jms.Message interface /** * Get the message ID. diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 4ca39cfcec..1916051c4c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -31,11 +31,18 @@ import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.ReplyTo; import org.apache.qpidity.client.util.ByteBufferMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class QpidMessage { /** + * this QpidMessage's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(QpidMessage.class); + + /** * The underlying qpidity message */ private org.apache.qpidity.api.Message _qpidityMessage; @@ -326,8 +333,8 @@ public class QpidMessage * @param messageBody The buffer containing this message data */ protected void setMessageData(ByteBuffer messageBody) - { - _messageData = messageBody.duplicate(); + { + _messageData = messageBody; // we shouldn't need that .duplicate(); } /** @@ -388,13 +395,12 @@ public class QpidMessage try { // set the message data - _qpidityMessage.clearData(); - // we need to do a flip - //_messageData.flip(); - - System.out.println("_messageData POS " + _messageData.position()); - System.out.println("_messageData limit " + _messageData.limit()); - + _qpidityMessage.clearData(); + if (_logger.isDebugEnabled()) + { + _logger.debug("_messageData POS " + _messageData.position()); + _logger.debug("_messageData limit " + _messageData.limit()); + } _qpidityMessage.appendData(_messageData); _qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties); } |
