diff options
Diffstat (limited to 'java')
18 files changed, 144 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index 55ca885e43..fc89f2d368 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -108,7 +108,6 @@ public class Client implements org.apache.qpidity.client.Connection ClientSession ssn = new ClientSession(); ssn.attach(ch); ssn.sessionOpen(expiryInSeconds); - return ssn; } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 9793d3ad8b..ea225976f2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -29,7 +29,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa { for (long l = range.getLower(); l <= range.getUpper(); l++) { - System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + System.out.println("Acknowleding transfer id : " + l); super.processed(l); } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index 17646b631e..c4565d4544 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -67,10 +67,12 @@ public class ClientSessionDelegate extends SessionDelegate } ((ClientSession)session).setRejectedMessages(struct.getTransfers()); ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); + session.processed(struct); } @Override public void messageAcquired(Session session, MessageAcquired struct) { ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + session.processed(struct); } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index 5394a50448..ed3209c1d0 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -30,7 +30,8 @@ public class ByteBufferMessage implements Message public ByteBufferMessage() { - + _currentDeliveryProps = new DeliveryProperties(); + _currentMessageProps = new MessageProperties(); } public ByteBufferMessage(long transferId) @@ -70,6 +71,7 @@ public class ByteBufferMessage implements Message public MessageProperties getMessageProperties() { + System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false"); return _currentMessageProps; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java index 75d0aaad05..31440eaaad 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java @@ -68,6 +68,8 @@ public class DestinationImpl implements Destination, Referenceable * Indicates whether this destination is durable */ protected boolean _isDurable; + + protected String _routingKey; /** * The biding URL used to create this destiantion @@ -79,6 +81,7 @@ public class DestinationImpl implements Destination, Referenceable protected DestinationImpl(String name) throws QpidException { _queueName = name; + _routingKey = name; } /** @@ -96,6 +99,7 @@ public class DestinationImpl implements Destination, Referenceable _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName(); + _routingKey = binding.getQueueName(); _url = binding; } @@ -171,6 +175,11 @@ public class DestinationImpl implements Destination, Referenceable return _isAutoDelete; } + public String getRoutingKey() + { + return _routingKey; + } + /** * Indicates whether this destination is Durable. * diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java index fe3ead4155..743cdc4e8c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java @@ -63,15 +63,16 @@ public abstract class MessageActor //TODO define the parameters - protected MessageActor() + protected MessageActor(String messageActorID) { - + _messageActorID = messageActorID; } - protected MessageActor(SessionImpl session, DestinationImpl destination) + protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID) { _session = session; _destination = destination; + _messageActorID = messageActorID; } //--- public methods (part of the jms public API) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 8bab833ddf..c2e8b4a482 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -112,9 +112,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, - boolean noLocal, String subscriptionName) throws Exception + boolean noLocal, String subscriptionName,String consumerTag) throws Exception { - super(session, destination); + super(session, destination,consumerTag); if (messageSelector != null) { _messageSelector = messageSelector; @@ -167,7 +167,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } // bind this queue with the topic exchange getSession().getQpidSession() - .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null); + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null); // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), @@ -183,6 +183,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT); + + // this will prevent the broker from sending more than one message + // When a messageListener is set the flow will be adjusted. + // until then we assume it's for synchronous message consumption + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); + getSession().getQpidSession().sync(); // check for an exception if (getSession().getCurrentException() != null) @@ -347,12 +354,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private Message internalReceive(long timeout) throws Exception { checkNotClosed(); + Message result = null; + if (_messageListener != null) { throw new javax.jms.IllegalStateException("A listener has already been set."); } - - Message result = null; + + if (_incomingMessage != null) + { + System.out.println("We already had a message in the queue"); + result = (Message) _incomingMessage; + _incomingMessage = null; + return result; + } + synchronized (_incomingMessageLock) { // This indicate to the delivery thread to deliver the message to this consumer @@ -366,11 +382,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer 1); getSession().getQpidSession().messageFlush(getMessageActorID()); _messageReceived.set(false); - + System.out.println("no message in the queue, issuing a flow(1) and waiting for message"); + //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); + + System.out.println("we got returned from sync()"); //received = getSession().getQpidSession().messagesReceived(); - } + } if (_messageReceived.get() && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return @@ -387,6 +406,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { try { + System.out.println("waiting for message"); _incomingMessageLock.wait(timeout); } catch (InterruptedException e) @@ -479,18 +499,24 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // notify the waiting thread if (_messageListener == null) { + System.out.println("Received a message- onMessage in message consumer Impl"); + synchronized (_incomingMessageLock) { if (messageOk) { + System.out.println("Received a message- onMessage in message ok " + messageOk); // we have received a proper message that we can deliver if (_isReceiving) { + System.out.println("Received a message- onMessage in message _isReceiving"); + _incomingMessage = message; _incomingMessageLock.notify(); } else { + System.out.println("Received a message- onMessage in message releasing"); // this message has been received after a received as returned // we need to release it releaseMessage(message); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index cc71bddafc..6d1d750e82 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -58,7 +58,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer //-- constructors public MessageProducerImpl(SessionImpl session, DestinationImpl destination) { - super(session, destination); + super(session, destination,""); } //--- Interface javax.jms.MessageProducer diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index 4dbf86a388..57f5da241a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -17,10 +17,13 @@ */ package org.apache.qpidity.jms; -import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.jms.message.MessageFactory; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.qpidity.api.Message; import org.apache.qpidity.client.util.MessageListener; +import org.apache.qpidity.jms.message.MessageFactory; +import org.apache.qpidity.jms.message.QpidMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +32,12 @@ import org.slf4j.LoggerFactory; * This is for guarantying that asynch messages are sequentially processed within their session. * <p> when used synchonously, messages are dispatched to the receiver itself. */ -public class QpidMessageListener implements MessageListener +public class QpidMessageListener implements MessageListener, Runnable { + + // temp solution + LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>(); + /** * Used for debugging. */ @@ -50,32 +57,35 @@ public class QpidMessageListener implements MessageListener public QpidMessageListener(MessageConsumerImpl consumer) { _consumer = consumer; + Thread t = new Thread(this); + t.start(); } - - //---- org.apache.qpidity.MessagePartListener API - /** - * Deliver a message to the listener. - * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message) + + public void run() { try { - // to be used with flush - _consumer.notifyMessageReceived(); - - //convert this message into a JMS one - QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); - // if consumer is asynchronous then send this message to its session. - if( _consumer.getMessageListener() != null ) + while(true) { - _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); - } - else - { - // deliver this message to the consumer itself - _consumer.onMessage(jmsMessage); + System.out.println("trying to take a message message"); + Message message = _queue.take(); + + // to be used with flush + System.out.println("processing the message"); + _consumer.notifyMessageReceived(); + + //convert this message into a JMS one + QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); + // if consumer is asynchronous then send this message to its session. + if( _consumer.getMessageListener() != null ) + { + _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); + } + else + { + // deliver this message to the consumer itself + _consumer.onMessage(jmsMessage); + } } } catch (Exception e) @@ -83,4 +93,17 @@ public class QpidMessageListener implements MessageListener throw new RuntimeException(e.getMessage()); } } + + //---- org.apache.qpidity.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + System.out.println("Received a message"); + _queue.offer(message); + System.out.println("Added queue to the message"); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 41c8bc02c6..d6b27be9a4 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -80,9 +80,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser * @param messageSelector only messages with properties matching the message selector expression are delivered. * @throws Exception In case of internal problem when creating this browser. */ - protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception + protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception { - super(session, (DestinationImpl) queue); + super(session, (DestinationImpl) queue,consumerTag); // this is an array representing a batch of messages for this browser. _messages = new Message[_maxbatchlength]; if (messageSelector != null) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java index 7ef0f151ae..24365a1e07 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java @@ -35,9 +35,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei * @param messageSelector the message selector for this QueueReceiverImpl. * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error. */ - protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception + protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception { - super(session, (DestinationImpl) queue, messageSelector, false, null); + super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag); } //--- Interface QueueReceiver diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java index 767acafe0d..4fe2a340eb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java @@ -128,7 +128,7 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession QueueReceiver receiver; try { - receiver = new QueueReceiverImpl(this, queue, messageSelector); + receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 57a69277a7..f08fdba373 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -25,12 +25,11 @@ import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.MessageListener; -import javax.jms.Session; import java.io.Serializable; import java.util.LinkedList; import java.util.HashMap; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; /** * Implementation of the JMS Session interface @@ -123,6 +122,12 @@ public class SessionImpl implements Session * This session connection */ private ConnectionImpl _connection; + + /** + * This will be used as the message actor id + * This in turn will be set as the destination + */ + protected AtomicInteger _consumerTag = new AtomicInteger(); //--- Constructor /** @@ -594,7 +599,7 @@ public class SessionImpl implements Session MessageConsumerImpl consumer; try { - consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -721,7 +726,7 @@ public class SessionImpl implements Session try { subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, - _connection.getClientID() + ":" + name); + _connection.getClientID() + ":" + name,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -765,7 +770,7 @@ public class SessionImpl implements Session QueueBrowserImpl browser; try { - browser = new QueueBrowserImpl(this, queue, messageSelector); + browser = new QueueBrowserImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { @@ -1114,7 +1119,7 @@ public class SessionImpl implements Session */ protected void testQpidException() throws QpidException { - _qpidSession.sync(); + //_qpidSession.sync(); QpidException qe = getCurrentException(); if (qe != null) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index 199e882234..22feb29598 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -42,6 +42,7 @@ public class TopicImpl extends DestinationImpl implements Topic { super(name); _queueName = "Topic-" + UUID.randomUUID(); + _routingKey = name; _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -61,6 +62,7 @@ public class TopicImpl extends DestinationImpl implements Topic { super(name); _queueName = "Topic-" + UUID.randomUUID(); + _routingKey = name; _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -116,7 +118,11 @@ public class TopicImpl extends DestinationImpl implements Topic // test if this exchange exist on the broker session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); // wait for the broker response + System.out.println("Checking for exchange"); + session.getQpidSession().sync(); + + System.out.println("Calling sync()"); // todo get the exception } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java index 15c9196fb6..21aa2411a8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java @@ -144,7 +144,7 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession TopicSubscriber topicSubscriber; try { - topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); + topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet())); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java index 9fe03d336d..ae9f59a77f 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java @@ -39,9 +39,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error. */ protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal, - String subscriptionName) throws Exception + String subscriptionName,String consumerTag) throws Exception { - super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName); + super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag); } //--- javax.jms.TopicSubscriber interface diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index ecf6c796d3..68dafc7345 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -70,6 +70,17 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); + try + { + ByteBuffer b = message.readData(); + byte[] a = new byte[b.limit()]; + b.get(a); + _dataIn = new DataInputStream(new ByteArrayInputStream(a)); + } + catch(Exception e) + { + e.printStackTrace(); + } } //--- BytesMessage API diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 67121f416a..4ca39cfcec 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -64,6 +64,7 @@ public class QpidMessage { // We us a byteBufferMessage as default _qpidityMessage = new ByteBufferMessage(); + System.out.println("Creating a bytes message"); _messageProperties = new HashMap<String, Object>(); // This is a newly created messsage so the data is empty _messageData = ByteBuffer.allocate(1024); @@ -325,8 +326,8 @@ public class QpidMessage * @param messageBody The buffer containing this message data */ protected void setMessageData(ByteBuffer messageBody) - { - _messageData = messageBody; + { + _messageData = messageBody.duplicate(); } /** @@ -389,7 +390,11 @@ public class QpidMessage // set the message data _qpidityMessage.clearData(); // we need to do a flip - _messageData.flip(); + //_messageData.flip(); + + System.out.println("_messageData POS " + _messageData.position()); + System.out.println("_messageData limit " + _messageData.limit()); + _qpidityMessage.appendData(_messageData); _qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties); } |
