summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-24 04:06:18 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-24 04:06:18 +0000
commit79882b848cdcec28b43f383c20651f7fe95a94c6 (patch)
treeb8d128471184593357db3a33689ad22282184002 /java
parent1d87ab05c2fdff3caa57dbc27f1511541cabd7b8 (diff)
downloadqpid-python-79882b848cdcec28b43f383c20651f7fe95a94c6.tar.gz
Fixed the following issues
1) TopicImpl doesn't populate the routing key properly. The Destination Impl needs to have a routing key field (I added the field). For Topic The queue name is generated. For Queue the routingkey is same as queue name. 2) QpidMessage - Calling flip on messageData resets the limit to zero in beforeMessageDispatch(). I commented out the flip() 3) QpidMessage - setMessageData Instead of _messageData = messageBody, I modified it to do _messageData = messageBody.duplicate(); 4) MessageActorId is not set properly - so I modified the code to set this. This id is used for the destination 5) When creating BytesMessageImpl, in the constructor, it doesn't read from the underlying message impl. There for the _readIn is null and results in MessageNotReadableException. I added a temp solution to read and populate _readIn. However need to revisit it later git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569238 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java9
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java40
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java71
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java17
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java11
18 files changed, 144 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index 55ca885e43..fc89f2d368 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -108,7 +108,6 @@ public class Client implements org.apache.qpidity.client.Connection
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
-
return ssn;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index 9793d3ad8b..ea225976f2 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -29,7 +29,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
- System.out.println("Acknowleding message for : " + super.getCommand((int) l));
+ System.out.println("Acknowleding transfer id : " + l);
super.processed(l);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index 17646b631e..c4565d4544 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -67,10 +67,12 @@ public class ClientSessionDelegate extends SessionDelegate
}
((ClientSession)session).setRejectedMessages(struct.getTransfers());
((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
+ session.processed(struct);
}
@Override public void messageAcquired(Session session, MessageAcquired struct)
{
((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ session.processed(struct);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index 5394a50448..ed3209c1d0 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -30,7 +30,8 @@ public class ByteBufferMessage implements Message
public ByteBufferMessage()
{
-
+ _currentDeliveryProps = new DeliveryProperties();
+ _currentMessageProps = new MessageProperties();
}
public ByteBufferMessage(long transferId)
@@ -70,6 +71,7 @@ public class ByteBufferMessage implements Message
public MessageProperties getMessageProperties()
{
+ System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false");
return _currentMessageProps;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
index 75d0aaad05..31440eaaad 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
@@ -68,6 +68,8 @@ public class DestinationImpl implements Destination, Referenceable
* Indicates whether this destination is durable
*/
protected boolean _isDurable;
+
+ protected String _routingKey;
/**
* The biding URL used to create this destiantion
@@ -79,6 +81,7 @@ public class DestinationImpl implements Destination, Referenceable
protected DestinationImpl(String name) throws QpidException
{
_queueName = name;
+ _routingKey = name;
}
/**
@@ -96,6 +99,7 @@ public class DestinationImpl implements Destination, Referenceable
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName();
+ _routingKey = binding.getQueueName();
_url = binding;
}
@@ -171,6 +175,11 @@ public class DestinationImpl implements Destination, Referenceable
return _isAutoDelete;
}
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
/**
* Indicates whether this destination is Durable.
*
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
index fe3ead4155..743cdc4e8c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
@@ -63,15 +63,16 @@ public abstract class MessageActor
//TODO define the parameters
- protected MessageActor()
+ protected MessageActor(String messageActorID)
{
-
+ _messageActorID = messageActorID;
}
- protected MessageActor(SessionImpl session, DestinationImpl destination)
+ protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID)
{
_session = session;
_destination = destination;
+ _messageActorID = messageActorID;
}
//--- public methods (part of the jms public API)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 8bab833ddf..c2e8b4a482 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -112,9 +112,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
- boolean noLocal, String subscriptionName) throws Exception
+ boolean noLocal, String subscriptionName,String consumerTag) throws Exception
{
- super(session, destination);
+ super(session, destination,consumerTag);
if (messageSelector != null)
{
_messageSelector = messageSelector;
@@ -167,7 +167,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
// bind this queue with the topic exchange
getSession().getQpidSession()
- .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null);
+ .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
@@ -183,6 +183,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+
+ // this will prevent the broker from sending more than one message
+ // When a messageListener is set the flow will be adjusted.
+ // until then we assume it's for synchronous message consumption
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+
getSession().getQpidSession().sync();
// check for an exception
if (getSession().getCurrentException() != null)
@@ -347,12 +354,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
private Message internalReceive(long timeout) throws Exception
{
checkNotClosed();
+ Message result = null;
+
if (_messageListener != null)
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
-
- Message result = null;
+
+ if (_incomingMessage != null)
+ {
+ System.out.println("We already had a message in the queue");
+ result = (Message) _incomingMessage;
+ _incomingMessage = null;
+ return result;
+ }
+
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to this consumer
@@ -366,11 +382,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
1);
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
-
+ System.out.println("no message in the queue, issuing a flow(1) and waiting for message");
+
//When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
+
+ System.out.println("we got returned from sync()");
//received = getSession().getQpidSession().messagesReceived();
- }
+ }
if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
@@ -387,6 +406,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
try
{
+ System.out.println("waiting for message");
_incomingMessageLock.wait(timeout);
}
catch (InterruptedException e)
@@ -479,18 +499,24 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// notify the waiting thread
if (_messageListener == null)
{
+ System.out.println("Received a message- onMessage in message consumer Impl");
+
synchronized (_incomingMessageLock)
{
if (messageOk)
{
+ System.out.println("Received a message- onMessage in message ok " + messageOk);
// we have received a proper message that we can deliver
if (_isReceiving)
{
+ System.out.println("Received a message- onMessage in message _isReceiving");
+
_incomingMessage = message;
_incomingMessageLock.notify();
}
else
{
+ System.out.println("Received a message- onMessage in message releasing");
// this message has been received after a received as returned
// we need to release it
releaseMessage(message);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
index cc71bddafc..6d1d750e82 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
@@ -58,7 +58,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
//-- constructors
public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
{
- super(session, destination);
+ super(session, destination,"");
}
//--- Interface javax.jms.MessageProducer
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
index 4dbf86a388..57f5da241a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
@@ -17,10 +17,13 @@
*/
package org.apache.qpidity.jms;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.util.MessageListener;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,8 +32,12 @@ import org.slf4j.LoggerFactory;
* This is for guarantying that asynch messages are sequentially processed within their session.
* <p> when used synchonously, messages are dispatched to the receiver itself.
*/
-public class QpidMessageListener implements MessageListener
+public class QpidMessageListener implements MessageListener, Runnable
{
+
+ // temp solution
+ LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
+
/**
* Used for debugging.
*/
@@ -50,32 +57,35 @@ public class QpidMessageListener implements MessageListener
public QpidMessageListener(MessageConsumerImpl consumer)
{
_consumer = consumer;
+ Thread t = new Thread(this);
+ t.start();
}
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
+
+ public void run()
{
try
{
- // to be used with flush
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its session.
- if( _consumer.getMessageListener() != null )
+ while(true)
{
- _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
+ System.out.println("trying to take a message message");
+ Message message = _queue.take();
+
+ // to be used with flush
+ System.out.println("processing the message");
+ _consumer.notifyMessageReceived();
+
+ //convert this message into a JMS one
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ // if consumer is asynchronous then send this message to its session.
+ if( _consumer.getMessageListener() != null )
+ {
+ _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
+ }
}
}
catch (Exception e)
@@ -83,4 +93,17 @@ public class QpidMessageListener implements MessageListener
throw new RuntimeException(e.getMessage());
}
}
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ System.out.println("Received a message");
+ _queue.offer(message);
+ System.out.println("Added queue to the message");
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index 41c8bc02c6..d6b27be9a4 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -80,9 +80,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
* @param messageSelector only messages with properties matching the message selector expression are delivered.
* @throws Exception In case of internal problem when creating this browser.
*/
- protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
+ protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue);
+ super(session, (DestinationImpl) queue,consumerTag);
// this is an array representing a batch of messages for this browser.
_messages = new Message[_maxbatchlength];
if (messageSelector != null)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
index 7ef0f151ae..24365a1e07 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
@@ -35,9 +35,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
* @param messageSelector the message selector for this QueueReceiverImpl.
* @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
*/
- protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue, messageSelector, false, null);
+ super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag);
}
//--- Interface QueueReceiver
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
index 767acafe0d..4fe2a340eb 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
@@ -128,7 +128,7 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession
QueueReceiver receiver;
try
{
- receiver = new QueueReceiverImpl(this, queue, messageSelector);
+ receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 57a69277a7..f08fdba373 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -25,12 +25,11 @@ import org.apache.qpidity.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Implementation of the JMS Session interface
@@ -123,6 +122,12 @@ public class SessionImpl implements Session
* This session connection
*/
private ConnectionImpl _connection;
+
+ /**
+ * This will be used as the message actor id
+ * This in turn will be set as the destination
+ */
+ protected AtomicInteger _consumerTag = new AtomicInteger();
//--- Constructor
/**
@@ -594,7 +599,7 @@ public class SessionImpl implements Session
MessageConsumerImpl consumer;
try
{
- consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -721,7 +726,7 @@ public class SessionImpl implements Session
try
{
subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
- _connection.getClientID() + ":" + name);
+ _connection.getClientID() + ":" + name,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -765,7 +770,7 @@ public class SessionImpl implements Session
QueueBrowserImpl browser;
try
{
- browser = new QueueBrowserImpl(this, queue, messageSelector);
+ browser = new QueueBrowserImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -1114,7 +1119,7 @@ public class SessionImpl implements Session
*/
protected void testQpidException() throws QpidException
{
- _qpidSession.sync();
+ //_qpidSession.sync();
QpidException qe = getCurrentException();
if (qe != null)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
index 199e882234..22feb29598 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
@@ -42,6 +42,7 @@ public class TopicImpl extends DestinationImpl implements Topic
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -61,6 +62,7 @@ public class TopicImpl extends DestinationImpl implements Topic
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -116,7 +118,11 @@ public class TopicImpl extends DestinationImpl implements Topic
// test if this exchange exist on the broker
session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
// wait for the broker response
+ System.out.println("Checking for exchange");
+
session.getQpidSession().sync();
+
+ System.out.println("Calling sync()");
// todo get the exception
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
index 15c9196fb6..21aa2411a8 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
@@ -144,7 +144,7 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession
TopicSubscriber topicSubscriber;
try
{
- topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
+ topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
index 9fe03d336d..ae9f59a77f 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
@@ -39,9 +39,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
* @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
*/
protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
- String subscriptionName) throws Exception
+ String subscriptionName,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName);
+ super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag);
}
//--- javax.jms.TopicSubscriber interface
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
index ecf6c796d3..68dafc7345 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
@@ -70,6 +70,17 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
{
super(message);
+ try
+ {
+ ByteBuffer b = message.readData();
+ byte[] a = new byte[b.limit()];
+ b.get(a);
+ _dataIn = new DataInputStream(new ByteArrayInputStream(a));
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
}
//--- BytesMessage API
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 67121f416a..4ca39cfcec 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -64,6 +64,7 @@ public class QpidMessage
{
// We us a byteBufferMessage as default
_qpidityMessage = new ByteBufferMessage();
+ System.out.println("Creating a bytes message");
_messageProperties = new HashMap<String, Object>();
// This is a newly created messsage so the data is empty
_messageData = ByteBuffer.allocate(1024);
@@ -325,8 +326,8 @@ public class QpidMessage
* @param messageBody The buffer containing this message data
*/
protected void setMessageData(ByteBuffer messageBody)
- {
- _messageData = messageBody;
+ {
+ _messageData = messageBody.duplicate();
}
/**
@@ -389,7 +390,11 @@ public class QpidMessage
// set the message data
_qpidityMessage.clearData();
// we need to do a flip
- _messageData.flip();
+ //_messageData.flip();
+
+ System.out.println("_messageData POS " + _messageData.position());
+ System.out.println("_messageData limit " + _messageData.limit());
+
_qpidityMessage.appendData(_messageData);
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
}