diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 04:06:18 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 04:06:18 +0000 |
| commit | 79882b848cdcec28b43f383c20651f7fe95a94c6 (patch) | |
| tree | b8d128471184593357db3a33689ad22282184002 /java/client | |
| parent | 1d87ab05c2fdff3caa57dbc27f1511541cabd7b8 (diff) | |
| download | qpid-python-79882b848cdcec28b43f383c20651f7fe95a94c6.tar.gz | |
Fixed the following issues
1) TopicImpl doesn't populate the routing key properly.
The Destination Impl needs to have a routing key field (I added the
field).
For Topic The queue name is generated.
For Queue the routingkey is same as queue name.
2) QpidMessage - Calling flip on messageData resets the limit to zero in
beforeMessageDispatch(). I commented out the flip()
3) QpidMessage - setMessageData
Instead of _messageData = messageBody, I modified it to do
_messageData = messageBody.duplicate();
4) MessageActorId is not set properly - so I modified the code to set
this. This id is used for the destination
5) When creating BytesMessageImpl, in the constructor, it doesn't read from the underlying
message impl. There for the _readIn is null and results in MessageNotReadableException.
I added a temp solution to read and populate _readIn.
However need to revisit it later
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569238 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
18 files changed, 144 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index 55ca885e43..fc89f2d368 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -108,7 +108,6 @@ public class Client implements org.apache.qpidity.client.Connection ClientSession ssn = new ClientSession(); ssn.attach(ch); ssn.sessionOpen(expiryInSeconds); - return ssn; } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 9793d3ad8b..ea225976f2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -29,7 +29,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa { for (long l = range.getLower(); l <= range.getUpper(); l++) { - System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + System.out.println("Acknowleding transfer id : " + l); super.processed(l); } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index 17646b631e..c4565d4544 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -67,10 +67,12 @@ public class ClientSessionDelegate extends SessionDelegate } ((ClientSession)session).setRejectedMessages(struct.getTransfers()); ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); + session.processed(struct); } @Override public void messageAcquired(Session session, MessageAcquired struct) { ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + session.processed(struct); } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index 5394a50448..ed3209c1d0 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -30,7 +30,8 @@ public class ByteBufferMessage implements Message public ByteBufferMessage() { - + _currentDeliveryProps = new DeliveryProperties(); + _currentMessageProps = new MessageProperties(); } public ByteBufferMessage(long transferId) @@ -70,6 +71,7 @@ public class ByteBufferMessage implements Message public MessageProperties getMessageProperties() { + System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false"); return _currentMessageProps; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java index 75d0aaad05..31440eaaad 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java @@ -68,6 +68,8 @@ public class DestinationImpl implements Destination, Referenceable * Indicates whether this destination is durable */ protected boolean _isDurable; + + protected String _routingKey; /** * The biding URL used to create this destiantion @@ -79,6 +81,7 @@ public class DestinationImpl implements Destination, Referenceable protected DestinationImpl(String name) throws QpidException { _queueName = name; + _routingKey = name; } /** @@ -96,6 +99,7 @@ public class DestinationImpl implements Destination, Referenceable _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName(); + _routingKey = binding.getQueueName(); _url = binding; } @@ -171,6 +175,11 @@ public class DestinationImpl implements Destination, Referenceable return _isAutoDelete; } + public String getRoutingKey() + { + return _routingKey; + } + /** * Indicates whether this destination is Durable. * diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java index fe3ead4155..743cdc4e8c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java @@ -63,15 +63,16 @@ public abstract class MessageActor //TODO define the parameters - protected MessageActor() + protected MessageActor(String messageActorID) { - + _messageActorID = messageActorID; } - protected MessageActor(SessionImpl session, DestinationImpl destination) + protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID) { _session = session; _destination = destination; + _messageActorID = messageActorID; } //--- public methods (part of the jms public API) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 8bab833ddf..c2e8b4a482 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -112,9 +112,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, - boolean noLocal, String subscriptionName) throws Exception + boolean noLocal, String subscriptionName,String consumerTag) throws Exception { - super(session, destination); + super(session, destination,consumerTag); if (messageSelector != null) { _messageSelector = messageSelector; @@ -167,7 +167,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } // bind this queue with the topic exchange getSession().getQpidSession() - .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null); + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null); // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), @@ -183,6 +183,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT); + + // this will prevent the broker from sending more than one message + // When a messageListener is set the flow will be adjusted. + // until then we assume it's for synchronous message consumption + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); + getSession().getQpidSession().sync(); // check for an exception if (getSession().getCurrentException() != null) @@ -347,12 +354,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private Message internalReceive(long timeout) throws Exception { checkNotClosed(); + Message result = null; + if (_messageListener != null) { throw new javax.jms.IllegalStateException("A listener has already been set."); } - - Message result = null; + + if (_incomingMessage != null) + { + System.out.println("We already had a message in the queue"); + result = (Message) _incomingMessage; + _incomingMessage = null; + return result; + } + synchronized (_incomingMessageLock) { // This indicate to the delivery thread to deliver the message to this consumer @@ -366,11 +382,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer 1); getSession().getQpidSession().messageFlush(getMessageActorID()); _messageReceived.set(false); - + System.out.println("no message in the queue, issuing a flow(1) and waiting for message"); + //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); + + System.out.println("we got returned from sync()"); //received = getSession().getQpidSession().messagesReceived(); - } + } if (_messageReceived.get() && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return @@ -387,6 +406,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { try { + System.out.println("waiting for message"); _incomingMessageLock.wait(timeout); } catch (InterruptedException e) @@ -479,18 +499,24 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // notify the waiting thread if (_messageListener == null) { + System.out.println("Received a message- onMessage in message consumer Impl"); + synchronized (_incomingMessageLock) { if (messageOk) { + System.out.println("Received a message- onMessage in message ok " + messageOk); // we have received a proper message that we can deliver if (_isReceiving) { + System.out.println("Received a message- onMessage in message _isReceiving"); + _incomingMessage = message; _incomingMessageLock.notify(); } else { + System.out.println("Received a message- onMessage in message releasing"); // this message has been received after a received as returned // we need to release it releaseMessage(message); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index cc71bddafc..6d1d750e82 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -58,7 +58,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer //-- constructors public MessageProducerImpl(SessionImpl session, DestinationImpl destination) { - super(session, destination); + super(session, destination,""); } //--- Interface javax.jms.MessageProducer diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index 4dbf86a388..57f5da241a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -17,10 +17,13 @@ */ package org.apache.qpidity.jms; -import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.jms.message.MessageFactory; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.qpidity.api.Message; import org.apache.qpidity.client.util.MessageListener; +import org.apache.qpidity.jms.message.MessageFactory; +import org.apache.qpidity.jms.message.QpidMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +32,12 @@ import org.slf4j.LoggerFactory; * This is for guarantying that asynch messages are sequentially processed within their session. * <p> when used synchonously, messages are dispatched to the receiver itself. */ -public class QpidMessageListener implements MessageListener +public class QpidMessageListener implements MessageListener, Runnable { + + // temp solution + LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>(); + /** * Used for debugging. */ @@ -50,32 +57,35 @@ public class QpidMessageListener implements MessageListener public QpidMessageListener(MessageConsumerImpl consumer) { _consumer = consumer; + Thread t = new Thread(this); + t.start(); } - - //---- org.apache.qpidity.MessagePartListener API - /** - * Deliver a message to the listener. - * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message) + + public void run() { try { - // to be used with flush - _consumer.notifyMessageReceived(); - - //convert this message into a JMS one - QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); - // if consumer is asynchronous then send this message to its session. - if( _consumer.getMessageListener() != null ) + while(true) { - _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); - } - else - { - // deliver this message to the consumer itself - _consumer.onMessage(jmsMessage); + System.out.println("trying to take a message message"); + Message message = _queue.take(); + + // to be used with flush + System.out.println("processing the message"); + _consumer.notifyMessageReceived(); + + //convert this message into a JMS one + QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); + // if consumer is asynchronous then send this message to its session. + if( _consumer.getMessageListener() != null ) + { + _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); + } + else + { + // deliver this message to the consumer itself + _consumer.onMessage(jmsMessage); + } } } catch (Exception e) @@ -83,4 +93,17 @@ public class QpidMessageListener implements MessageListener throw new RuntimeException(e.getMessage()); } } + + //---- org.apache.qpidity.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + System.out.println("Received a message"); + _queue.offer(message); + System.out.println("Added queue to the message"); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 41c8bc02c6..d6b27be9a4 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -80,9 +80,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser * @param messageSelector only messages with properties matching the message selector expression are delivered. * @throws Exception In case of internal problem when creating this browser. */ - protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception + protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception { - super(session, (DestinationImpl) queue); + super(session, (DestinationImpl) queue,consumerTag); // this is an array representing a batch of messages for this browser. _messages = new Message[_maxbatchlength]; if (messageSelector != null) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java index 7ef0f151ae..24365a1e07 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java @@ -35,9 +35,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei * @param messageSelector the message selector for this QueueReceiverImpl. * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error. */ - protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception + protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception { - super(session, (DestinationImpl) queue, messageSelector, false, null); + super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag); } //--- Interface QueueReceiver diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java index 767acafe0d..4fe2a340eb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java @@ -128,7 +128,7 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession QueueReceiver receiver; try { - receiver = new QueueReceiverImpl(this, queue, messageSelector); + receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 57a69277a7..f08fdba373 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -25,12 +25,11 @@ import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.MessageListener; -import javax.jms.Session; import java.io.Serializable; import java.util.LinkedList; import java.util.HashMap; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; /** * Implementation of the JMS Session interface @@ -123,6 +122,12 @@ public class SessionImpl implements Session * This session connection */ private ConnectionImpl _connection; + + /** + * This will be used as the message actor id + * This in turn will be set as the destination + */ + protected AtomicInteger _consumerTag = new AtomicInteger(); //--- Constructor /** @@ -594,7 +599,7 @@ public class SessionImpl implements Session MessageConsumerImpl consumer; try { - consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -721,7 +726,7 @@ public class SessionImpl implements Session try { subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, - _connection.getClientID() + ":" + name); + _connection.getClientID() + ":" + name,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -765,7 +770,7 @@ public class SessionImpl implements Session QueueBrowserImpl browser; try { - browser = new QueueBrowserImpl(this, queue, messageSelector); + browser = new QueueBrowserImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -1114,7 +1119,7 @@ public class SessionImpl implements Session */ protected void testQpidException() throws QpidException { - _qpidSession.sync(); + //_qpidSession.sync(); QpidException qe = getCurrentException(); if (qe != null) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index 199e882234..22feb29598 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -42,6 +42,7 @@ public class TopicImpl extends DestinationImpl implements Topic { super(name); _queueName = "Topic-" + UUID.randomUUID(); + _routingKey = name; _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -61,6 +62,7 @@ public class TopicImpl extends DestinationImpl implements Topic { super(name); _queueName = "Topic-" + UUID.randomUUID(); + _routingKey = name; _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -116,7 +118,11 @@ public class TopicImpl extends DestinationImpl implements Topic // test if this exchange exist on the broker session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); // wait for the broker response + System.out.println("Checking for exchange"); + session.getQpidSession().sync(); + + System.out.println("Calling sync()"); // todo get the exception } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java index 15c9196fb6..21aa2411a8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java @@ -144,7 +144,7 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession TopicSubscriber topicSubscriber; try { - topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); + topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java index 9fe03d336d..ae9f59a77f 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java @@ -39,9 +39,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error. */ protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal, - String subscriptionName) throws Exception + String subscriptionName,String consumerTag) throws Exception { - super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName); + super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag); } //--- javax.jms.TopicSubscriber interface diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index ecf6c796d3..68dafc7345 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -70,6 +70,17 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); + try + { + ByteBuffer b = message.readData(); + byte[] a = new byte[b.limit()]; + b.get(a); + _dataIn = new DataInputStream(new ByteArrayInputStream(a)); + } + catch(Exception e) + { + e.printStackTrace(); + } } //--- BytesMessage API diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 67121f416a..4ca39cfcec 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -64,6 +64,7 @@ public class QpidMessage { // We us a byteBufferMessage as default _qpidityMessage = new ByteBufferMessage(); + System.out.println("Creating a bytes message"); _messageProperties = new HashMap<String, Object>(); // This is a newly created messsage so the data is empty _messageData = ByteBuffer.allocate(1024); @@ -325,8 +326,8 @@ public class QpidMessage * @param messageBody The buffer containing this message data */ protected void setMessageData(ByteBuffer messageBody) - { - _messageData = messageBody; + { + _messageData = messageBody.duplicate(); } /** @@ -389,7 +390,11 @@ public class QpidMessage // set the message data _qpidityMessage.clearData(); // we need to do a flip - _messageData.flip(); + //_messageData.flip(); + + System.out.println("_messageData POS " + _messageData.position()); + System.out.println("_messageData limit " + _messageData.limit()); + _qpidityMessage.appendData(_messageData); _qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties); } |
