summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java9
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java40
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java71
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java17
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java11
18 files changed, 144 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index 55ca885e43..fc89f2d368 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -108,7 +108,6 @@ public class Client implements org.apache.qpidity.client.Connection
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
-
return ssn;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index 9793d3ad8b..ea225976f2 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -29,7 +29,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
- System.out.println("Acknowleding message for : " + super.getCommand((int) l));
+ System.out.println("Acknowleding transfer id : " + l);
super.processed(l);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index 17646b631e..c4565d4544 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -67,10 +67,12 @@ public class ClientSessionDelegate extends SessionDelegate
}
((ClientSession)session).setRejectedMessages(struct.getTransfers());
((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
+ session.processed(struct);
}
@Override public void messageAcquired(Session session, MessageAcquired struct)
{
((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ session.processed(struct);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index 5394a50448..ed3209c1d0 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -30,7 +30,8 @@ public class ByteBufferMessage implements Message
public ByteBufferMessage()
{
-
+ _currentDeliveryProps = new DeliveryProperties();
+ _currentMessageProps = new MessageProperties();
}
public ByteBufferMessage(long transferId)
@@ -70,6 +71,7 @@ public class ByteBufferMessage implements Message
public MessageProperties getMessageProperties()
{
+ System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false");
return _currentMessageProps;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
index 75d0aaad05..31440eaaad 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
@@ -68,6 +68,8 @@ public class DestinationImpl implements Destination, Referenceable
* Indicates whether this destination is durable
*/
protected boolean _isDurable;
+
+ protected String _routingKey;
/**
* The biding URL used to create this destiantion
@@ -79,6 +81,7 @@ public class DestinationImpl implements Destination, Referenceable
protected DestinationImpl(String name) throws QpidException
{
_queueName = name;
+ _routingKey = name;
}
/**
@@ -96,6 +99,7 @@ public class DestinationImpl implements Destination, Referenceable
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName();
+ _routingKey = binding.getQueueName();
_url = binding;
}
@@ -171,6 +175,11 @@ public class DestinationImpl implements Destination, Referenceable
return _isAutoDelete;
}
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
/**
* Indicates whether this destination is Durable.
*
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
index fe3ead4155..743cdc4e8c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
@@ -63,15 +63,16 @@ public abstract class MessageActor
//TODO define the parameters
- protected MessageActor()
+ protected MessageActor(String messageActorID)
{
-
+ _messageActorID = messageActorID;
}
- protected MessageActor(SessionImpl session, DestinationImpl destination)
+ protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID)
{
_session = session;
_destination = destination;
+ _messageActorID = messageActorID;
}
//--- public methods (part of the jms public API)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 8bab833ddf..c2e8b4a482 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -112,9 +112,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
- boolean noLocal, String subscriptionName) throws Exception
+ boolean noLocal, String subscriptionName,String consumerTag) throws Exception
{
- super(session, destination);
+ super(session, destination,consumerTag);
if (messageSelector != null)
{
_messageSelector = messageSelector;
@@ -167,7 +167,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
// bind this queue with the topic exchange
getSession().getQpidSession()
- .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null);
+ .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
@@ -183,6 +183,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+
+ // this will prevent the broker from sending more than one message
+ // When a messageListener is set the flow will be adjusted.
+ // until then we assume it's for synchronous message consumption
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+
getSession().getQpidSession().sync();
// check for an exception
if (getSession().getCurrentException() != null)
@@ -347,12 +354,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
private Message internalReceive(long timeout) throws Exception
{
checkNotClosed();
+ Message result = null;
+
if (_messageListener != null)
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
-
- Message result = null;
+
+ if (_incomingMessage != null)
+ {
+ System.out.println("We already had a message in the queue");
+ result = (Message) _incomingMessage;
+ _incomingMessage = null;
+ return result;
+ }
+
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to this consumer
@@ -366,11 +382,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
1);
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
-
+ System.out.println("no message in the queue, issuing a flow(1) and waiting for message");
+
//When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
+
+ System.out.println("we got returned from sync()");
//received = getSession().getQpidSession().messagesReceived();
- }
+ }
if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
@@ -387,6 +406,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
try
{
+ System.out.println("waiting for message");
_incomingMessageLock.wait(timeout);
}
catch (InterruptedException e)
@@ -479,18 +499,24 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// notify the waiting thread
if (_messageListener == null)
{
+ System.out.println("Received a message- onMessage in message consumer Impl");
+
synchronized (_incomingMessageLock)
{
if (messageOk)
{
+ System.out.println("Received a message- onMessage in message ok " + messageOk);
// we have received a proper message that we can deliver
if (_isReceiving)
{
+ System.out.println("Received a message- onMessage in message _isReceiving");
+
_incomingMessage = message;
_incomingMessageLock.notify();
}
else
{
+ System.out.println("Received a message- onMessage in message releasing");
// this message has been received after a received as returned
// we need to release it
releaseMessage(message);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
index cc71bddafc..6d1d750e82 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
@@ -58,7 +58,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
//-- constructors
public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
{
- super(session, destination);
+ super(session, destination,"");
}
//--- Interface javax.jms.MessageProducer
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
index 4dbf86a388..57f5da241a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
@@ -17,10 +17,13 @@
*/
package org.apache.qpidity.jms;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.util.MessageListener;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,8 +32,12 @@ import org.slf4j.LoggerFactory;
* This is for guarantying that asynch messages are sequentially processed within their session.
* <p> when used synchonously, messages are dispatched to the receiver itself.
*/
-public class QpidMessageListener implements MessageListener
+public class QpidMessageListener implements MessageListener, Runnable
{
+
+ // temp solution
+ LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
+
/**
* Used for debugging.
*/
@@ -50,32 +57,35 @@ public class QpidMessageListener implements MessageListener
public QpidMessageListener(MessageConsumerImpl consumer)
{
_consumer = consumer;
+ Thread t = new Thread(this);
+ t.start();
}
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
+
+ public void run()
{
try
{
- // to be used with flush
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its session.
- if( _consumer.getMessageListener() != null )
+ while(true)
{
- _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
+ System.out.println("trying to take a message message");
+ Message message = _queue.take();
+
+ // to be used with flush
+ System.out.println("processing the message");
+ _consumer.notifyMessageReceived();
+
+ //convert this message into a JMS one
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ // if consumer is asynchronous then send this message to its session.
+ if( _consumer.getMessageListener() != null )
+ {
+ _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
+ }
}
}
catch (Exception e)
@@ -83,4 +93,17 @@ public class QpidMessageListener implements MessageListener
throw new RuntimeException(e.getMessage());
}
}
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ System.out.println("Received a message");
+ _queue.offer(message);
+ System.out.println("Added queue to the message");
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index 41c8bc02c6..d6b27be9a4 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -80,9 +80,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
* @param messageSelector only messages with properties matching the message selector expression are delivered.
* @throws Exception In case of internal problem when creating this browser.
*/
- protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
+ protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue);
+ super(session, (DestinationImpl) queue,consumerTag);
// this is an array representing a batch of messages for this browser.
_messages = new Message[_maxbatchlength];
if (messageSelector != null)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
index 7ef0f151ae..24365a1e07 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
@@ -35,9 +35,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
* @param messageSelector the message selector for this QueueReceiverImpl.
* @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
*/
- protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue, messageSelector, false, null);
+ super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag);
}
//--- Interface QueueReceiver
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
index 767acafe0d..4fe2a340eb 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
@@ -128,7 +128,7 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession
QueueReceiver receiver;
try
{
- receiver = new QueueReceiverImpl(this, queue, messageSelector);
+ receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 57a69277a7..f08fdba373 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -25,12 +25,11 @@ import org.apache.qpidity.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Implementation of the JMS Session interface
@@ -123,6 +122,12 @@ public class SessionImpl implements Session
* This session connection
*/
private ConnectionImpl _connection;
+
+ /**
+ * This will be used as the message actor id
+ * This in turn will be set as the destination
+ */
+ protected AtomicInteger _consumerTag = new AtomicInteger();
//--- Constructor
/**
@@ -594,7 +599,7 @@ public class SessionImpl implements Session
MessageConsumerImpl consumer;
try
{
- consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -721,7 +726,7 @@ public class SessionImpl implements Session
try
{
subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
- _connection.getClientID() + ":" + name);
+ _connection.getClientID() + ":" + name,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -765,7 +770,7 @@ public class SessionImpl implements Session
QueueBrowserImpl browser;
try
{
- browser = new QueueBrowserImpl(this, queue, messageSelector);
+ browser = new QueueBrowserImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -1114,7 +1119,7 @@ public class SessionImpl implements Session
*/
protected void testQpidException() throws QpidException
{
- _qpidSession.sync();
+ //_qpidSession.sync();
QpidException qe = getCurrentException();
if (qe != null)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
index 199e882234..22feb29598 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
@@ -42,6 +42,7 @@ public class TopicImpl extends DestinationImpl implements Topic
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -61,6 +62,7 @@ public class TopicImpl extends DestinationImpl implements Topic
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -116,7 +118,11 @@ public class TopicImpl extends DestinationImpl implements Topic
// test if this exchange exist on the broker
session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
// wait for the broker response
+ System.out.println("Checking for exchange");
+
session.getQpidSession().sync();
+
+ System.out.println("Calling sync()");
// todo get the exception
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
index 15c9196fb6..21aa2411a8 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
@@ -144,7 +144,7 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession
TopicSubscriber topicSubscriber;
try
{
- topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
+ topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
index 9fe03d336d..ae9f59a77f 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
@@ -39,9 +39,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
* @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
*/
protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
- String subscriptionName) throws Exception
+ String subscriptionName,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName);
+ super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag);
}
//--- javax.jms.TopicSubscriber interface
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
index ecf6c796d3..68dafc7345 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
@@ -70,6 +70,17 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
{
super(message);
+ try
+ {
+ ByteBuffer b = message.readData();
+ byte[] a = new byte[b.limit()];
+ b.get(a);
+ _dataIn = new DataInputStream(new ByteArrayInputStream(a));
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
}
//--- BytesMessage API
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 67121f416a..4ca39cfcec 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -64,6 +64,7 @@ public class QpidMessage
{
// We us a byteBufferMessage as default
_qpidityMessage = new ByteBufferMessage();
+ System.out.println("Creating a bytes message");
_messageProperties = new HashMap<String, Object>();
// This is a newly created messsage so the data is empty
_messageData = ByteBuffer.allocate(1024);
@@ -325,8 +326,8 @@ public class QpidMessage
* @param messageBody The buffer containing this message data
*/
protected void setMessageData(ByteBuffer messageBody)
- {
- _messageData = messageBody;
+ {
+ _messageData = messageBody.duplicate();
}
/**
@@ -389,7 +390,11 @@ public class QpidMessage
// set the message data
_qpidityMessage.clearData();
// we need to do a flip
- _messageData.flip();
+ //_messageData.flip();
+
+ System.out.println("_messageData POS " + _messageData.position());
+ System.out.println("_messageData limit " + _messageData.limit());
+
_qpidityMessage.appendData(_messageData);
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
}