summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-24 08:45:43 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-24 08:45:43 +0000
commit039ef144769bdff486273fc088db762297091db1 (patch)
tree94ff2f4cf94eb678228828294f3a15013ccc8eb9 /java/client
parent79882b848cdcec28b43f383c20651f7fe95a94c6 (diff)
downloadqpid-python-039ef144769bdff486273fc088db762297091db1.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569298 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java47
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java24
5 files changed, 41 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index c2e8b4a482..75ce2c326e 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -96,7 +96,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
+
private AtomicBoolean _messageReceived = new AtomicBoolean();
//----- Constructors
@@ -109,12 +109,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param noLocal If true inhibits the delivery of messages published by its own connection.
* @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
* If this value is null, a non-durable subscription is created.
+ * @param consumerTag This consumer ID.
* @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
- boolean noLocal, String subscriptionName,String consumerTag) throws Exception
+ boolean noLocal, String subscriptionName, String consumerTag) throws Exception
{
- super(session, destination,consumerTag);
+ super(session, destination, consumerTag);
if (messageSelector != null)
{
_messageSelector = messageSelector;
@@ -183,13 +184,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
-
+
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
-
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+
getSession().getQpidSession().sync();
// check for an exception
if (getSession().getCurrentException() != null)
@@ -355,12 +356,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
checkNotClosed();
Message result = null;
-
+
if (_messageListener != null)
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
-
+
if (_incomingMessage != null)
{
System.out.println("We already had a message in the queue");
@@ -368,7 +369,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
_incomingMessage = null;
return result;
}
-
+
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to this consumer
@@ -383,13 +384,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
System.out.println("no message in the queue, issuing a flow(1) and waiting for message");
-
+
//When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
-
- System.out.println("we got returned from sync()");
+
+ System.out.println("we got returned from sync()");
//received = getSession().getQpidSession().messagesReceived();
- }
+ }
if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
@@ -406,7 +407,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
try
{
- System.out.println("waiting for message");
+ System.out.println("waiting for message");
_incomingMessageLock.wait(timeout);
}
catch (InterruptedException e)
@@ -427,7 +428,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// We now release any message received for this consumer
_isReceiving = false;
_isNoWaitIsReceiving = false;
- getSession().testQpidException();
+ getSession().testQpidException();
}
return result;
}
@@ -500,7 +501,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
if (_messageListener == null)
{
System.out.println("Received a message- onMessage in message consumer Impl");
-
+
synchronized (_incomingMessageLock)
{
if (messageOk)
@@ -510,7 +511,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
if (_isReceiving)
{
System.out.println("Received a message- onMessage in message _isReceiving");
-
+
_incomingMessage = message;
_incomingMessageLock.notify();
}
@@ -534,11 +535,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
-
+
// When sync() returns we know whether we have received a message or not.
- getSession().getQpidSession().sync();
-
- if (_messageReceived.get() && _isNoWaitIsReceiving)
+ getSession().getQpidSession().sync();
+
+ if (_messageReceived.get() && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
@@ -619,7 +620,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
RangeSet ranges = new RangeSet();
ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageRelease(ranges);
- getSession().testQpidException();
+ getSession().testQpidException();
}
}
@@ -667,7 +668,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().testQpidException();
}
}
-
+
public void notifyMessageReceived()
{
_messageReceived.set(true);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
index 68dafc7345..ecf6c796d3 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
@@ -70,17 +70,6 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
{
super(message);
- try
- {
- ByteBuffer b = message.readData();
- byte[] a = new byte[b.limit()];
- b.get(a);
- _dataIn = new DataInputStream(new ByteArrayInputStream(a));
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
}
//--- BytesMessage API
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
index 9c4387d8b0..11bcec28ea 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
@@ -562,8 +562,8 @@ public class MapMessageImpl extends MessageImpl implements MapMessage
//-- Overwritten methods
/**
* This method is invoked before this message is dispatched.
- * <p>This class uses it to convert its text payload into a ByteBuffer
*/
+ @Override
public void beforeMessageDispatch() throws QpidException
{
try
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
index 1bd93d792d..c6d707e563 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
@@ -83,7 +83,7 @@ public class MessageImpl extends QpidMessage implements Message
{
super(message);
}
-
+
//---- javax.jms.Message interface
/**
* Get the message ID.
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 4ca39cfcec..1916051c4c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -31,11 +31,18 @@ import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.ReplyTo;
import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class QpidMessage
{
/**
+ * this QpidMessage's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(QpidMessage.class);
+
+ /**
* The underlying qpidity message
*/
private org.apache.qpidity.api.Message _qpidityMessage;
@@ -326,8 +333,8 @@ public class QpidMessage
* @param messageBody The buffer containing this message data
*/
protected void setMessageData(ByteBuffer messageBody)
- {
- _messageData = messageBody.duplicate();
+ {
+ _messageData = messageBody; // we shouldn't need that .duplicate();
}
/**
@@ -388,13 +395,12 @@ public class QpidMessage
try
{
// set the message data
- _qpidityMessage.clearData();
- // we need to do a flip
- //_messageData.flip();
-
- System.out.println("_messageData POS " + _messageData.position());
- System.out.println("_messageData limit " + _messageData.limit());
-
+ _qpidityMessage.clearData();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("_messageData POS " + _messageData.position());
+ _logger.debug("_messageData limit " + _messageData.limit());
+ }
_qpidityMessage.appendData(_messageData);
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
}