summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-04 16:32:57 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-04 16:32:57 +0000
commitd17b47b7857279344a8af12a0c0b2c848bbaa81c (patch)
tree99d246e827265db5b25d424b127931dac0ed6598 /java
parentbffb8ad81c398598b8e431fce90bdbbaf13668df (diff)
downloadqpid-python-d17b47b7857279344a8af12a0c0b2c848bbaa81c.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java57
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java309
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java113
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java279
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java15
13 files changed, 689 insertions, 254 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index f3ef74a2e9..bbcc17aca5 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -89,6 +89,27 @@ public class ClientSession implements org.apache.qpid.nclient.Session
public void messageRelease(Range... range) throws QpidException
{
//To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public void messageFlowMode(String destination, short mode)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageFlow(String destination, short unit, long value)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean messageFlush(String destination)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageStop(String destination)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
}// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
index 7cc7659139..6efc136e50 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
@@ -225,7 +225,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
// start all the sessions
for (SessionImpl session : _sessions)
{
- session.start();
+ try
+ {
+ session.start();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
_started = true;
}
@@ -249,7 +256,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
// stop all the sessions
for (SessionImpl session : _sessions)
{
- session.stop();
+ try
+ {
+ session.stop();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
_started = false;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
index 0a4c465815..e3838f0f84 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
@@ -22,22 +22,29 @@ import org.apache.qpidity.QpidException;
import javax.jms.JMSException;
/**
- *Helper class for handling exceptions
+ * Helper class for handling exceptions
*/
public class ExceptionHelper
{
static public JMSException convertQpidExceptionToJMSException(Exception exception)
{
JMSException jmsException = null;
- if (exception instanceof QpidException)
+ if (!(exception instanceof JMSException))
{
- jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+ if (exception instanceof QpidException)
+ {
+ jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+ }
+ else
+ {
+ jmsException = new JMSException(exception.getMessage());
+ }
+ jmsException.setLinkedException(exception);
}
else
{
- jmsException = new JMSException(exception.getMessage());
+ jmsException = (JMSException) exception;
}
- jmsException.setLinkedException(exception);
return jmsException;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
index f75bbbb888..fd0e9ff9c9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
@@ -32,12 +32,12 @@ public abstract class MessageActor
/**
* Used for debugging.
*/
- private static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
/**
* Indicates whether this MessageActor is closed.
*/
- private boolean _isClosed = false;
+ protected boolean _isClosed = false;
/**
* This messageActor's session
@@ -49,6 +49,10 @@ public abstract class MessageActor
*/
DestinationImpl _destination;
+ /**
+ * Indicates that this actor is stopped
+ */
+ protected boolean _isStopped;
/**
* The ID of this actor for the session.
@@ -59,9 +63,9 @@ public abstract class MessageActor
//TODO define the parameters
- protected MessageActor()
+ protected MessageActor()
{
-
+
}
protected MessageActor(SessionImpl session, DestinationImpl destination)
@@ -87,7 +91,30 @@ public abstract class MessageActor
}
//-- protected methods
- /**
+
+ /**
+ * Stop this message actor
+ *
+ * @throws Exception If the consumer cannot be stopped due to some internal error.
+ */
+ protected void stop() throws Exception
+ {
+ _isStopped = true;
+ }
+
+ /**
+ * Start this message Actor
+ *
+ * @throws Exception If the consumer cannot be started due to some internal error.
+ */
+ protected void start() throws Exception
+ {
+
+ _isStopped = false;
+
+ }
+
+ /**
* Check if this MessageActor is not closed.
* <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException.
* <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
@@ -118,38 +145,32 @@ public abstract class MessageActor
{
if (!_isClosed)
{
- // close the underlying qpid resource
- /* try
+ try
{
- // Arnaud I can't see where this var is initialized
- // I assume it's the session
- //_qpidResource.close();
-
-
+ // cancle this destination
+ getSession().getQpidSession().messageCancel(getMessageActorID());
}
catch (QpidException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }*/
-
- _session.close(); //is this correct ?
+ }
_isClosed = true;
}
}
/**
- * Get the associated session object.
+ * Get the associated session object.
*
* @return This Actor's Session.
*/
- protected SessionImpl getSession()
+ protected SessionImpl getSession()
{
return _session;
}
/**
* Get the ID of this actor within its session.
- *
+ *
* @return This actor ID.
*/
protected String getMessageActorID()
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
index be23e34529..0952d730e2 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
@@ -19,16 +19,21 @@ package org.apache.qpid.nclient.jms;
//import org.apache.qpid.nclient.api.MessageReceiver;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Message;
+import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.apache.qpid.nclient.impl.MessagePartListenerAdapter;
+import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+
+import javax.jms.*;
/**
* Implementation of JMS message consumer
*/
public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
+ public static final short MESSAGE_FLOW_MODE = 0; // we use message flow mode
/**
* This MessageConsumer's messageselector.
@@ -53,9 +58,20 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
private MessageListener _messageListener;
/**
- * A warpper around the JSM message listener
+ * The synchronous message just delivered
*/
- private MessageListenerWrapper _messageListenerWrapper;
+ private QpidMessage _incomingMessage;
+
+ /**
+ * A lcok on the syncrhonous message
+ */
+ private final Object _incomingMessageLock = new Object();
+
+ /**
+ * Indicates that this consumer is receiving a synch message
+ */
+ private boolean _isReceiving = false;
+
//----- Constructors
/**
@@ -67,25 +83,43 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param noLocal If true inhibits the delivery of messages published by its own connection.
* @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
* If this value is null, a non-durable subscription is created.
- * @throws JMSException If the MessageProducerImpl cannot be created due to some internal error.
+ * @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
- boolean noLocal, String subscriptionName) throws JMSException
+ boolean noLocal, String subscriptionName) throws Exception
{
super(session, destination);
_messageSelector = messageSelector;
_noLocal = noLocal;
_subscriptionName = subscriptionName;
- /*try
+ _isStopped = getSession().isStopped();
+ if (destination instanceof Queue)
{
- // TODO define the relevant options
- _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE);
- _qpidResource = _qpidReceiver;
+ // this is a queue we expect that this queue exists
+ // let's create a message part assembler
+ /**
+ * A Qpid message listener that pushes messages to this consumer session when this consumer is
+ * asynchronous or directly to this consumer when it is synchronously accessed.
+ */
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+ // we use the default options: EXCLUSIVE = false, PRE-ACCQUIRE and CONFIRM = off
+ if (_noLocal)
+ {
+ getSession().getQpidSession()
+ .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null,
+ Option.NO_LOCAL);
+ }
+ else
+ {
+ getSession().getQpidSession()
+ .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null);
+ }
}
- catch (QpidException e)
+ else
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }*/
+ // this is a topic we need to create a temporary queue for this consumer
+ // unless this is a durable subscriber
+ }
}
//----- Message consumer API
@@ -129,20 +163,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param messageListener The listener to which the messages are to be delivered
* @throws JMSException If setting the message listener fails due to some internal error.
*/
- public void setMessageListener(MessageListener messageListener) throws JMSException
+ public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
{
+ // this method is synchronized as onMessage also access _messagelistener
+ // onMessage, getMessageListener and this method are the only synchronized methods
checkNotClosed();
_messageListener = messageListener;
- if( messageListener == null )
- {
-
- _messageListenerWrapper = null;
- }
- else
- {
- _messageListenerWrapper = new MessageListenerWrapper(this);
- //TODO _qpidReceiver.setAsynchronous(_messageListenerWrapper);
- }
}
/**
@@ -160,7 +186,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
/**
* Receive the next message that arrives within the specified timeout interval.
- * <p> This call blocks until a message arrives, the timeout expires, or this message consumer is closed.
+ * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+ * is closed.
* <p> A timeout of zero never expires, and the call blocks indefinitely.
* <p> A timeout less than 0 throws a JMSException.
*
@@ -170,9 +197,20 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receive(long timeout) throws JMSException
{
- Message message = null;
- // todo convert this message into a JMS one: _qpidReceiver.receive(-1);
- return message;
+ if (timeout < 0)
+ {
+ throw new JMSException("Invalid timeout value: " + timeout);
+ }
+ Message result;
+ try
+ {
+ result = internalReceive(timeout);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
/**
@@ -183,44 +221,221 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receiveNoWait() throws JMSException
{
- return receive(-1);
+ Message result;
+ try
+ {
+ result = internalReceive(-1);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
-
// not public methods
+
+ /**
+ * Receive a synchronous message
+ * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+ * is closed.
+ * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
+ * is closed)
+ * <p> A timeout less than 0 returns the next message or null if one is not available.
+ *
+ * @param timeout The timeout value (in milliseconds)
+ * @return the next message or null if one is not available.
+ * @throws Exception If receiving the next message fails due to some internal error.
+ */
+ private Message internalReceive(long timeout) throws Exception
+ {
+ checkNotClosed();
+ if (_messageListener != null)
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+
+ Message result = null;
+ synchronized (_incomingMessageLock)
+ {
+ // This indicate to the delivery thread to deliver the message to this consumer
+ // as it can happens that a message is delivered after a receive operation as returned.
+ _isReceiving = true;
+ boolean received = false;
+ if (!_isStopped)
+ {
+ // if this consumer is stopped then this will be call when starting
+ getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+ received = getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+ if (!received && timeout < 0)
+ {
+ // this is a nowait and we havent received a message then we must immediatly return
+ result = null;
+ }
+ else
+ {
+ while (_incomingMessage == null && !_isClosed)
+ {
+ try
+ {
+ _incomingMessageLock.wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ }
+ if (_incomingMessage != null)
+ {
+ result = _incomingMessage.getJMSMessage();
+ // tell the session that a message is inprocess
+ getSession().preProcessMessage(_incomingMessage);
+ // tell the session to acknowledge this message (if required)
+ getSession().acknowledgeMessage(_incomingMessage);
+ }
+ _incomingMessage = null;
+ // We now release any message received for this consumer
+ _isReceiving = false;
+ }
+ }
+ return result;
+ }
+
/**
- * Stop the delivery of messages to this receiver.
+ * Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message listener
* finishes processing the current message,
*
- * @throws JMSException If the consumer cannot be stopped due to some internal error.
+ * @throws Exception If the consumer cannot be stopped due to some internal error.
*/
- void stop() throws JMSException
+ protected void stop() throws Exception
{
- /*try
+ getSession().getQpidSession().messageStop(getMessageActorID());
+ _isStopped = true;
+ }
+
+ /**
+ * Start the delivery of messages to this consumer.
+ *
+ * @throws Exception If the consumer cannot be started due to some internal error.
+ */
+ protected void start() throws Exception
+ {
+ synchronized (_incomingMessageLock)
{
- _qpidReceiver.stop();
+ _isStopped = false;
+ if (_isReceiving)
+ {
+ // there is a synch call waiting for a message to be delivered
+ // so tell the broker to deliver a message
+ getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
}
- catch (QpidException e)
+ }
+
+ /**
+ * Deliver a message to this consumer.
+ *
+ * @param message The message delivered to this consumer.
+ */
+ protected synchronized void onMessage(QpidMessage message)
+ {
+ try
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }*/
+ // if this consumer is synchronous then set the current message and
+ // notify the waiting thread
+ if (_messageListener == null)
+ {
+ synchronized (_incomingMessageLock)
+ {
+ if (_isReceiving)
+ {
+ _incomingMessage = message;
+ _incomingMessageLock.notify();
+ }
+ else
+ {
+ // this message has been received after a received as returned
+ // we need to release it
+ releaseMessage(message);
+ }
+ }
+ }
+ else
+ {
+ // This is an asynchronous message
+ // tell the session that a message is in process
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+ _messageListener.onMessage(message.getJMSMessage());
+ }
+ catch (RuntimeException re)
+ {
+ // do nothing as this message will not be redelivered
+ }
+ // If the session has been recovered we then need to redelivered this message
+ if (getSession().isInRecovery())
+ {
+ releaseMessage(message);
+ }
+ else if (!getSession().getTransacted())
+ {
+ // Tell the jms Session to ack this message if required
+ getSession().acknowledgeMessage(message);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
}
/**
- * Start the delivery of messages to this consumer.
+ * Release a message
*
- * @throws JMSException If the consumer cannot be started due to some internal error.
+ * @param message The message to be released
+ * @throws JMSException If the message cannot be released due to some internal error.
*/
- void start() throws JMSException
+ private void releaseMessage(QpidMessage message) throws JMSException
{
- /*try
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ try
{
- _qpidReceiver.start();
+ getSession().getQpidSession().messageRelease(range);
}
catch (QpidException e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }*/
+ // notify the Exception listener
+ if (getSession().getConnection().getExceptionListener() != null)
+ {
+ getSession().getConnection().getExceptionListener()
+ .onException(ExceptionHelper.convertQpidExceptionToJMSException(e));
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Excpetion when releasing message " + message, e);
+ }
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
deleted file mode 100644
index 0fce102226..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient.jms;
-
-import org.apache.qpid.nclient.MessageListener;
-import org.apache.qpid.nclient.jms.message.QpidMessage;
-import org.apache.qpid.nclient.jms.message.AbstractJMSMessage;
-import org.apache.qpidity.api.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a wrapper for the JMS message listener
- */
-public class MessageListenerWrapper implements MessageListener
-{
- /**
- * Used for debugging.
- */
- private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
-
- /**
- * This message listener consumer
- */
- MessageConsumerImpl _consumer = null;
-
- /**
- * The jms listener of this consumer.
- */
- javax.jms.MessageListener _jmsMessageLstener = null;
-
- //---- constructor
- /**
- * Create a message listener wrapper for a given consumer
- *
- * @param consumer The consumer of this listener
- */
- public MessageListenerWrapper(MessageConsumerImpl consumer)
- {
- _consumer = consumer;
- }
-
- //---- org.apache.qpid.nclient.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- try
- {
- // tell the session that a message is in process
- _consumer.getSession().preProcessMessage((QpidMessage) message);
- //TODO build the JMS message form a qpid message
- AbstractJMSMessage jmsMessage = null;
- // If the session is transacted we need to ack the message first
- // This is because a message is associated with its tx only when acked
- if (_consumer.getSession().getTransacted())
- {
- _consumer.getSession().acknowledgeMessage(jmsMessage);
- }
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session�s
- * acknowledgment mode.
- � --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- � --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
- _jmsMessageLstener.onMessage(jmsMessage);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
- // If the session has been recovered we then need to redelivered this message
- if (_consumer.getSession().isInRecovery())
- {
- //message.release();
- }
- // Tell the jms Session to ack this message if required
- else if (!_consumer.getSession().getTransacted())
- {
- _consumer.getSession().acknowledgeMessage(jmsMessage);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java
new file mode 100644
index 0000000000..ece0905b0f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.apache.qpidity.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session.
+ * This is for guarantying that asynch messages are sequentially processed within their session.
+ * <p> when used synchonously, messages are dispatched to the receiver itself.
+ */
+public class QpidMessageListener implements MessageListener
+{
+ /**
+ * Used for debugging.
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+ /**
+ * This message listener consumer
+ */
+ MessageConsumerImpl _consumer = null;
+
+ //---- constructor
+ /**
+ * Create a message listener wrapper for a given consumer
+ *
+ * @param consumer The consumer of this listener
+ */
+ public QpidMessageListener(MessageConsumerImpl consumer)
+ {
+ _consumer = consumer;
+ }
+
+ //---- org.apache.qpid.nclient.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ //convert this message into a JMS one
+ QpidMessage jmsMessage = null; // todo
+ // if consumer is asynchronous then send this message to its session.
+ if( _consumer.getMessageListener() != null )
+ {
+ _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
index d45f2b54f1..3c57ee727f 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
@@ -33,9 +33,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
* @param session The session from which the QueueReceiverImpl is instantiated.
* @param queue The default queue for this QueueReceiverImpl.
* @param messageSelector the message selector for this QueueReceiverImpl.
- * @throws JMSException If the QueueReceiverImpl cannot be created due to some internal error.
+ * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
*/
- protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
{
super(session, (DestinationImpl) queue, messageSelector, false, null);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
index ed97145c04..89a4d963eb 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
@@ -123,7 +123,16 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession
{
checkNotClosed();
checkDestination(queue);
- return new QueueReceiverImpl(this, queue, messageSelector);
+ QueueReceiver receiver;
+ try
+ {
+ receiver = new QueueReceiverImpl(this, queue, messageSelector);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return receiver;
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index 1aca96582f..22136814a1 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.nclient.jms.message.*;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -28,9 +29,9 @@ import javax.jms.Session;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.io.Serializable;
-import java.util.Vector;
import java.util.LinkedList;
import java.util.HashMap;
+import java.util.ArrayList;
/**
* Implementation of the JMS Session interface
@@ -43,43 +44,41 @@ public class SessionImpl implements Session
private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
/**
- * A queue for incoming messages including synch and asych messages.
+ * A queue for incoming asynch messages.
*/
- private LinkedList<QpidMessage> _incomingAsynchronousMessages = new LinkedList<QpidMessage>();
+ private final LinkedList<IncomingMessage> _incomingAsynchronousMessages = new LinkedList<IncomingMessage>();
- //--- Session thread locking
+ //--- MessageDispatcherThread and Session locking
/**
- * indicates that the sessionThread has stopped
+ * indicates that the MessageDispatcherThread has stopped
*/
private boolean _hasStopped = false;
/**
- * lock for the sessionThread to wait until the session is stopped
+ * lock for the MessageDispatcherThread to wait until the session is stopped
*/
- private Object _stoppingLock = new Object();
+ private final Object _stoppingLock = new Object();
/**
- * lock for the stopper thread to wait on when the sessionThread is stopping
+ * lock for the stopper thread to wait on when the MessageDispatcherThread is stopping
*/
- private Object _stoppingJoin = new Object();
+ private final Object _stoppingJoin = new Object();
/**
* thread to dispatch messages to async consumers
*/
private MessageDispatcherThread _messageDispatcherThread = null;
-
+ //----END
/**
* The messageActors of this session.
*/
- private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
+ private final HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
/**
* All the not yet acknoledged messages
- * We use a vector as access to this list has to be synchronised
- * This is because messages are acked from messagelistner threads
*/
- private Vector<QpidMessage> _unacknowledgedMessages = new Vector<QpidMessage>();
+ private final ArrayList<QpidMessage> _unacknowledgedMessages = new ArrayList<QpidMessage>();
/**
* Indicates whether this session is closed.
@@ -156,10 +155,8 @@ public class SessionImpl implements Session
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- // Create and start a MessageDispatcherThread
- // This thread is dispatching messages to the async consumers
- _messageDispatcherThread = new MessageDispatcherThread();
- _messageDispatcherThread.start();
+ // init the message dispatcher.
+ initMessageDispatcherThread();
}
//--- javax.jms.Session API
@@ -379,10 +376,9 @@ public class SessionImpl implements Session
// that will stop the sessionThread
if (_isStopped)
{
- start();
+ startDispatchThread();
}
-
- //stop the sessionThread
+ //notify the sessionThread
synchronized (_incomingAsynchronousMessages)
{
_incomingAsynchronousMessages.notifyAll();
@@ -401,8 +397,9 @@ public class SessionImpl implements Session
// from now all the session methods will throw a IllegalStateException
_isClosed = true;
// close all the actors
- closeAllActors();
+ closeAllMessageActors();
_messageActors.clear();
+ // We may have a thread trying to add a message
synchronized (_incomingAsynchronousMessages)
{
_incomingAsynchronousMessages.clear();
@@ -417,7 +414,6 @@ public class SessionImpl implements Session
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
-
}
}
@@ -451,8 +447,17 @@ public class SessionImpl implements Session
// release all unack messages
for (QpidMessage message : _unacknowledgedMessages)
{
- // release all those messages
- //Todo: message.getQpidMEssage.release();
+ // release this message
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ try
+ {
+ getQpidSession().messageRelease(range);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // TODO We can be a little bit cleverer and build a set of ranges
}
}
@@ -559,12 +564,20 @@ public class SessionImpl implements Session
* @throws InvalidDestinationException If an invalid destination is specified.
* @throws InvalidSelectorException If the message selector is invalid.
*/
- public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws
- JMSException
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
{
checkNotClosed();
checkDestination(destination);
- MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ MessageConsumerImpl consumer = null;
+ try
+ {
+ consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
// register this actor with the session
_messageActors.put(consumer.getMessageActorID(), consumer);
return consumer;
@@ -647,12 +660,21 @@ public class SessionImpl implements Session
* @throws InvalidDestinationException If an invalid topic is specified.
* @throws InvalidSelectorException If the message selector is invalid.
*/
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector,
- boolean noLocal) throws JMSException
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
{
checkNotClosed();
checkDestination(topic);
- TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
+ TopicSubscriberImpl subscriber;
+ try
+ {
+ subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
+ _connection.getClientID() + ":" + name);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
_messageActors.put(subscriber.getMessageActorID(), subscriber);
return subscriber;
}
@@ -739,43 +761,71 @@ public class SessionImpl implements Session
* Remove a message actor form this session
* <p> This method is called when an actor is independently closed.
*
- * @param actor The closed actor.
+ * @param messageActor The closed actor.
+ */
+ protected void closeMessageActor(MessageActor messageActor)
+ {
+ _messageActors.remove(messageActor.getMessageActorID());
+ }
+
+ /**
+ * Idincates whether this session is stopped.
+ *
+ * @return True is this session is stopped, false otherwise.
*/
- protected void closeMessageActor(MessageActor actor)
+ protected boolean isStopped()
{
- _messageActors.remove(actor);
+ return _isStopped;
}
/**
* Start the flow of message to this session.
*
- * @throws JMSException If starting the session fails due to some communication error.
+ * @throws Exception If starting the session fails due to some communication error.
*/
- protected void start() throws JMSException
+ protected synchronized void start() throws Exception
{
if (_isStopped)
{
- synchronized (_stoppingLock)
+ // start all the MessageActors
+ for (MessageActor messageActor : _messageActors.values())
{
- _isStopped = false;
- _stoppingLock.notify();
- }
- synchronized (_stoppingJoin)
- {
- _hasStopped = false;
+ messageActor.start();
}
+ startDispatchThread();
+ }
+ }
+
+ /**
+ * Restart delivery of asynch messages
+ */
+ private void startDispatchThread()
+ {
+ synchronized (_stoppingLock)
+ {
+ _isStopped = false;
+ _stoppingLock.notify();
+ }
+ synchronized (_stoppingJoin)
+ {
+ _hasStopped = false;
}
}
/**
* Stop the flow of message to this session.
*
- * @throws JMSException If stopping the session fails due to some communication error.
+ * @throws Exception If stopping the session fails due to some communication error.
*/
- protected void stop() throws JMSException
+ protected synchronized void stop() throws Exception
{
if (!_isClosing && !_isStopped)
{
+ // stop all the MessageActors
+ for (MessageActor messageActor : _messageActors.values())
+ {
+ messageActor.stop();
+ }
synchronized (_incomingAsynchronousMessages)
{
_isStopped = true;
@@ -811,6 +861,21 @@ public class SessionImpl implements Session
}
/**
+ * Dispatch this message to this session asynchronous consumers
+ *
+ * @param consumerID The consumer ID.
+ * @param message The message to be dispatched.
+ */
+ public void dispatchMessage(String consumerID, QpidMessage message)
+ {
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message));
+ _incomingAsynchronousMessages.notifyAll();
+ }
+ }
+
+ /**
* Indicate whether this session is recovering .
*
* @return true if this session is recovering.
@@ -850,7 +915,8 @@ public class SessionImpl implements Session
{
if (dest == null)
{
- throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest, "Invalid destination");
+ throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest,
+ "Invalid destination");
}
}
@@ -869,14 +935,25 @@ public class SessionImpl implements Session
{
// messages will be acknowldeged by the client application.
// store this message for acknowledging it afterward
- _unacknowledgedMessages.add(message);
+ synchronized (_unacknowledgedMessages)
+ {
+ _unacknowledgedMessages.add(message);
+ }
}
else
{
// acknowledge this message
- // TODO: message.acknowledgeQpidMessge();
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ try
+ {
+ getQpidSession().messageAcknowledge(range);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
- //TODO: Implement DUPS OK heuristic
+ //tobedone: Implement DUPS OK heuristic
}
/**
@@ -895,13 +972,25 @@ public class SessionImpl implements Session
checkNotClosed();
if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- for (QpidMessage message : _unacknowledgedMessages)
+ synchronized (_unacknowledgedMessages)
{
- // acknowledge this message
- // TODO: message.acknowledgeQpidMessge();
+ for (QpidMessage message : _unacknowledgedMessages)
+ {
+ // acknowledge this message
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ try
+ {
+ getQpidSession().messageAcknowledge(range);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // TODO We can be a little bit cleverer and build a set of ranges
+ }
+ //empty the list of unack messages
+ _unacknowledgedMessages.clear();
}
- //empty the list of unack messages
- _unacknowledgedMessages.clear();
}
//else there is no effect
}
@@ -916,13 +1005,23 @@ public class SessionImpl implements Session
return _qpidSession;
}
+ /**
+ * Get this session's conneciton
+ *
+ * @return This session's connection
+ */
+ protected ConnectionImpl getConnection()
+ {
+ return _connection;
+ }
+
//------ Private Methods
/**
* Close the producer and the consumers of this session
*
* @throws JMSException If one of the MessaeActor cannot be closed due to some internal error.
*/
- private void closeAllActors() throws JMSException
+ private void closeAllMessageActors() throws JMSException
{
for (MessageActor messageActor : _messageActors.values())
{
@@ -930,9 +1029,66 @@ public class SessionImpl implements Session
}
}
+ /**
+ * create and start the MessageDispatcherThread.
+ */
+ private synchronized void initMessageDispatcherThread()
+ {
+ // Create and start a MessageDispatcherThread
+ // This thread is dispatching messages to the async consumers
+ _messageDispatcherThread = new MessageDispatcherThread();
+ _messageDispatcherThread.start();
+ }
+
//------ Inner classes
/**
+ * Convenient class for storing incoming messages associated with a consumer ID.
+ * <p> Those messages are enqueued in _incomingAsynchronousMessages
+ */
+ private class IncomingMessage
+ {
+ // The consumer ID
+ private String _consumerId;
+ // The message
+ private QpidMessage _message;
+
+ //-- constructor
+ /**
+ * Creat a new incoming message
+ *
+ * @param consumerId The consumer ID
+ * @param message The message to be delivered
+ */
+ IncomingMessage(String consumerId, QpidMessage message)
+ {
+ _consumerId = consumerId;
+ _message = message;
+ }
+
+ // Getters
+ /**
+ * Get the consumer ID
+ *
+ * @return The consumer ID for this message
+ */
+ public String getConsumerId()
+ {
+ return _consumerId;
+ }
+
+ /**
+ * Get the message.
+ *
+ * @return The message.
+ */
+ public QpidMessage getMessage()
+ {
+ return _message;
+ }
+ }
+
+ /**
* A MessageDispatcherThread is attached to every SessionImpl.
* <p/>
* This thread is responsible for removing messages from m_incomingMessages and
@@ -957,9 +1113,8 @@ public class SessionImpl implements Session
*/
public void run()
{
- QpidMessage message = null;
-
- // deliver messages to consumers until the stop flag is set.
+ IncomingMessage message = null;
+ // deliver messages to asynchronous consumers until the stop flag is set.
do
{
// When this session is not closing and and stopped
@@ -1012,19 +1167,19 @@ public class SessionImpl implements Session
MessageConsumerImpl mc;
synchronized (_messageActors)
{
- mc = null; // todo _messageActors.get(message.consumerID);
+ mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId());
}
- boolean consumed = false;
if (mc != null)
{
try
{
- // todo call onMessage
+ mc.onMessage(message.getMessage());
}
catch (RuntimeException t)
{
// the JMS specification tells us to flag that to the client!
- _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
+ _logger.error(
+ "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
index 8537fd1268..39744697a0 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
@@ -139,6 +139,15 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession
{
checkNotClosed();
checkDestination(topic);
- return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
- }
+ TopicSubscriber topicSubscriber;
+ try
+ {
+ topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return topicSubscriber;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
index 23007a5839..1dfee37b9b 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
@@ -36,10 +36,10 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
* @param noLocal If true inhibits the delivery of messages published by its own connection.
* @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
* If this value is null, a non-durable subscription is created.
- * @throws javax.jms.JMSException If the TopicSubscriberImpl cannot be created due to internal error.
+ * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
*/
protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
- String subscriptionName) throws JMSException
+ String subscriptionName) throws Exception
{
super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
index 92a32f9bf4..02a43b0414 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
@@ -21,6 +21,7 @@
package org.apache.qpid.nclient.jms.message;
import javax.jms.JMSException;
+import javax.jms.Message;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.ContentHeaderProperties;
@@ -132,4 +133,18 @@ public class QpidMessage
{
return getPropertyHeaders().getTimestamp(propertyName);
}
+
+ public Message getJMSMessage()
+ {
+ // todo
+ return null;
+ }
+
+ public Long getMessageID()
+ {
+ //todo
+ return new Long(1);
+ }
}
+
+