diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-04 16:32:57 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-04 16:32:57 +0000 |
| commit | d17b47b7857279344a8af12a0c0b2c848bbaa81c (patch) | |
| tree | 99d246e827265db5b25d424b127931dac0ed6598 /java | |
| parent | bffb8ad81c398598b8e431fce90bdbbaf13668df (diff) | |
| download | qpid-python-d17b47b7857279344a8af12a0c0b2c848bbaa81c.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 689 insertions, 254 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index f3ef74a2e9..bbcc17aca5 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -89,6 +89,27 @@ public class ClientSession implements org.apache.qpid.nclient.Session public void messageRelease(Range... range) throws QpidException { //To change body of implemented methods use File | Settings | File Templates. + } + + + public void messageFlowMode(String destination, short mode) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageFlow(String destination, short unit, long value) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean messageFlush(String destination) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageStop(String destination) + { + //To change body of implemented methods use File | Settings | File Templates. }// ----------------------------------------------- // Local transaction methods // ---------------------------------------------- diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java index 7cc7659139..6efc136e50 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java @@ -225,7 +225,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // start all the sessions for (SessionImpl session : _sessions) { - session.start(); + try + { + session.start(); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } _started = true; } @@ -249,7 +256,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // stop all the sessions for (SessionImpl session : _sessions) { - session.stop(); + try + { + session.stop(); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } _started = false; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java index 0a4c465815..e3838f0f84 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java @@ -22,22 +22,29 @@ import org.apache.qpidity.QpidException; import javax.jms.JMSException; /** - *Helper class for handling exceptions + * Helper class for handling exceptions */ public class ExceptionHelper { static public JMSException convertQpidExceptionToJMSException(Exception exception) { JMSException jmsException = null; - if (exception instanceof QpidException) + if (!(exception instanceof JMSException)) { - jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + if (exception instanceof QpidException) + { + jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + } + else + { + jmsException = new JMSException(exception.getMessage()); + } + jmsException.setLinkedException(exception); } else { - jmsException = new JMSException(exception.getMessage()); + jmsException = (JMSException) exception; } - jmsException.setLinkedException(exception); return jmsException; } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java index f75bbbb888..fd0e9ff9c9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java @@ -32,12 +32,12 @@ public abstract class MessageActor /** * Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(MessageActor.class); + protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class); /** * Indicates whether this MessageActor is closed. */ - private boolean _isClosed = false; + protected boolean _isClosed = false; /** * This messageActor's session @@ -49,6 +49,10 @@ public abstract class MessageActor */ DestinationImpl _destination; + /** + * Indicates that this actor is stopped + */ + protected boolean _isStopped; /** * The ID of this actor for the session. @@ -59,9 +63,9 @@ public abstract class MessageActor //TODO define the parameters - protected MessageActor() + protected MessageActor() { - + } protected MessageActor(SessionImpl session, DestinationImpl destination) @@ -87,7 +91,30 @@ public abstract class MessageActor } //-- protected methods - /** + + /** + * Stop this message actor + * + * @throws Exception If the consumer cannot be stopped due to some internal error. + */ + protected void stop() throws Exception + { + _isStopped = true; + } + + /** + * Start this message Actor + * + * @throws Exception If the consumer cannot be started due to some internal error. + */ + protected void start() throws Exception + { + + _isStopped = false; + + } + + /** * Check if this MessageActor is not closed. * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException. * <p> The method is not synchronized, since MessageProducers can only be used by a single thread. @@ -118,38 +145,32 @@ public abstract class MessageActor { if (!_isClosed) { - // close the underlying qpid resource - /* try + try { - // Arnaud I can't see where this var is initialized - // I assume it's the session - //_qpidResource.close(); - - + // cancle this destination + getSession().getQpidSession().messageCancel(getMessageActorID()); } catch (QpidException e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); - }*/ - - _session.close(); //is this correct ? + } _isClosed = true; } } /** - * Get the associated session object. + * Get the associated session object. * * @return This Actor's Session. */ - protected SessionImpl getSession() + protected SessionImpl getSession() { return _session; } /** * Get the ID of this actor within its session. - * + * * @return This actor ID. */ protected String getMessageActorID() diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index be23e34529..0952d730e2 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -19,16 +19,21 @@ package org.apache.qpid.nclient.jms; //import org.apache.qpid.nclient.api.MessageReceiver; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Message; +import org.apache.qpid.nclient.jms.message.QpidMessage; +import org.apache.qpid.nclient.impl.MessagePartListenerAdapter; +import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpidity.Range; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; + +import javax.jms.*; /** * Implementation of JMS message consumer */ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { + public static final short MESSAGE_FLOW_MODE = 0; // we use message flow mode /** * This MessageConsumer's messageselector. @@ -53,9 +58,20 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private MessageListener _messageListener; /** - * A warpper around the JSM message listener + * The synchronous message just delivered */ - private MessageListenerWrapper _messageListenerWrapper; + private QpidMessage _incomingMessage; + + /** + * A lcok on the syncrhonous message + */ + private final Object _incomingMessageLock = new Object(); + + /** + * Indicates that this consumer is receiving a synch message + */ + private boolean _isReceiving = false; + //----- Constructors /** @@ -67,25 +83,43 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param noLocal If true inhibits the delivery of messages published by its own connection. * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. * If this value is null, a non-durable subscription is created. - * @throws JMSException If the MessageProducerImpl cannot be created due to some internal error. + * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, - boolean noLocal, String subscriptionName) throws JMSException + boolean noLocal, String subscriptionName) throws Exception { super(session, destination); _messageSelector = messageSelector; _noLocal = noLocal; _subscriptionName = subscriptionName; - /*try + _isStopped = getSession().isStopped(); + if (destination instanceof Queue) { - // TODO define the relevant options - _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE); - _qpidResource = _qpidReceiver; + // this is a queue we expect that this queue exists + // let's create a message part assembler + /** + * A Qpid message listener that pushes messages to this consumer session when this consumer is + * asynchronous or directly to this consumer when it is synchronously accessed. + */ + MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); + // we use the default options: EXCLUSIVE = false, PRE-ACCQUIRE and CONFIRM = off + if (_noLocal) + { + getSession().getQpidSession() + .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null, + Option.NO_LOCAL); + } + else + { + getSession().getQpidSession() + .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null); + } } - catch (QpidException e) + else { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - }*/ + // this is a topic we need to create a temporary queue for this consumer + // unless this is a durable subscriber + } } //----- Message consumer API @@ -129,20 +163,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param messageListener The listener to which the messages are to be delivered * @throws JMSException If setting the message listener fails due to some internal error. */ - public void setMessageListener(MessageListener messageListener) throws JMSException + public synchronized void setMessageListener(MessageListener messageListener) throws JMSException { + // this method is synchronized as onMessage also access _messagelistener + // onMessage, getMessageListener and this method are the only synchronized methods checkNotClosed(); _messageListener = messageListener; - if( messageListener == null ) - { - - _messageListenerWrapper = null; - } - else - { - _messageListenerWrapper = new MessageListenerWrapper(this); - //TODO _qpidReceiver.setAsynchronous(_messageListenerWrapper); - } } /** @@ -160,7 +186,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer /** * Receive the next message that arrives within the specified timeout interval. - * <p> This call blocks until a message arrives, the timeout expires, or this message consumer is closed. + * <p> This call blocks until a message arrives, the timeout expires, or this message consumer + * is closed. * <p> A timeout of zero never expires, and the call blocks indefinitely. * <p> A timeout less than 0 throws a JMSException. * @@ -170,9 +197,20 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer */ public Message receive(long timeout) throws JMSException { - Message message = null; - // todo convert this message into a JMS one: _qpidReceiver.receive(-1); - return message; + if (timeout < 0) + { + throw new JMSException("Invalid timeout value: " + timeout); + } + Message result; + try + { + result = internalReceive(timeout); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** @@ -183,44 +221,221 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer */ public Message receiveNoWait() throws JMSException { - return receive(-1); + Message result; + try + { + result = internalReceive(-1); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } - // not public methods + + /** + * Receive a synchronous message + * <p> This call blocks until a message arrives, the timeout expires, or this message consumer + * is closed. + * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer + * is closed) + * <p> A timeout less than 0 returns the next message or null if one is not available. + * + * @param timeout The timeout value (in milliseconds) + * @return the next message or null if one is not available. + * @throws Exception If receiving the next message fails due to some internal error. + */ + private Message internalReceive(long timeout) throws Exception + { + checkNotClosed(); + if (_messageListener != null) + { + throw new javax.jms.IllegalStateException("A listener has already been set."); + } + + Message result = null; + synchronized (_incomingMessageLock) + { + // This indicate to the delivery thread to deliver the message to this consumer + // as it can happens that a message is delivered after a receive operation as returned. + _isReceiving = true; + boolean received = false; + if (!_isStopped) + { + // if this consumer is stopped then this will be call when starting + getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1); + received = getSession().getQpidSession().messageFlush(getMessageActorID()); + } + if (!received && timeout < 0) + { + // this is a nowait and we havent received a message then we must immediatly return + result = null; + } + else + { + while (_incomingMessage == null && !_isClosed) + { + try + { + _incomingMessageLock.wait(timeout); + } + catch (InterruptedException e) + { + // do nothing + } + } + if (_incomingMessage != null) + { + result = _incomingMessage.getJMSMessage(); + // tell the session that a message is inprocess + getSession().preProcessMessage(_incomingMessage); + // tell the session to acknowledge this message (if required) + getSession().acknowledgeMessage(_incomingMessage); + } + _incomingMessage = null; + // We now release any message received for this consumer + _isReceiving = false; + } + } + return result; + } + /** - * Stop the delivery of messages to this receiver. + * Stop the delivery of messages to this consumer. * <p>For asynchronous receiver, this operation blocks until the message listener * finishes processing the current message, * - * @throws JMSException If the consumer cannot be stopped due to some internal error. + * @throws Exception If the consumer cannot be stopped due to some internal error. */ - void stop() throws JMSException + protected void stop() throws Exception { - /*try + getSession().getQpidSession().messageStop(getMessageActorID()); + _isStopped = true; + } + + /** + * Start the delivery of messages to this consumer. + * + * @throws Exception If the consumer cannot be started due to some internal error. + */ + protected void start() throws Exception + { + synchronized (_incomingMessageLock) { - _qpidReceiver.stop(); + _isStopped = false; + if (_isReceiving) + { + // there is a synch call waiting for a message to be delivered + // so tell the broker to deliver a message + getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1); + getSession().getQpidSession().messageFlush(getMessageActorID()); + } } - catch (QpidException e) + } + + /** + * Deliver a message to this consumer. + * + * @param message The message delivered to this consumer. + */ + protected synchronized void onMessage(QpidMessage message) + { + try { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - }*/ + // if this consumer is synchronous then set the current message and + // notify the waiting thread + if (_messageListener == null) + { + synchronized (_incomingMessageLock) + { + if (_isReceiving) + { + _incomingMessage = message; + _incomingMessageLock.notify(); + } + else + { + // this message has been received after a received as returned + // we need to release it + releaseMessage(message); + } + } + } + else + { + // This is an asynchronous message + // tell the session that a message is in process + getSession().preProcessMessage(message); + // If the session is transacted we need to ack the message first + // This is because a message is associated with its tx only when acked + if (getSession().getTransacted()) + { + getSession().acknowledgeMessage(message); + } + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session?s + * acknowledgment mode. + ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try + { + _messageListener.onMessage(message.getJMSMessage()); + } + catch (RuntimeException re) + { + // do nothing as this message will not be redelivered + } + // If the session has been recovered we then need to redelivered this message + if (getSession().isInRecovery()) + { + releaseMessage(message); + } + else if (!getSession().getTransacted()) + { + // Tell the jms Session to ack this message if required + getSession().acknowledgeMessage(message); + } + } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } } /** - * Start the delivery of messages to this consumer. + * Release a message * - * @throws JMSException If the consumer cannot be started due to some internal error. + * @param message The message to be released + * @throws JMSException If the message cannot be released due to some internal error. */ - void start() throws JMSException + private void releaseMessage(QpidMessage message) throws JMSException { - /*try + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try { - _qpidReceiver.start(); + getSession().getQpidSession().messageRelease(range); } catch (QpidException e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - }*/ + // notify the Exception listener + if (getSession().getConnection().getExceptionListener() != null) + { + getSession().getConnection().getExceptionListener() + .onException(ExceptionHelper.convertQpidExceptionToJMSException(e)); + } + if (_logger.isDebugEnabled()) + { + _logger.debug("Excpetion when releasing message " + message, e); + } + } } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java deleted file mode 100644 index 0fce102226..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java +++ /dev/null @@ -1,113 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.jms; - -import org.apache.qpid.nclient.MessageListener; -import org.apache.qpid.nclient.jms.message.QpidMessage; -import org.apache.qpid.nclient.jms.message.AbstractJMSMessage; -import org.apache.qpidity.api.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a wrapper for the JMS message listener - */ -public class MessageListenerWrapper implements MessageListener -{ - /** - * Used for debugging. - */ - private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); - - /** - * This message listener consumer - */ - MessageConsumerImpl _consumer = null; - - /** - * The jms listener of this consumer. - */ - javax.jms.MessageListener _jmsMessageLstener = null; - - //---- constructor - /** - * Create a message listener wrapper for a given consumer - * - * @param consumer The consumer of this listener - */ - public MessageListenerWrapper(MessageConsumerImpl consumer) - { - _consumer = consumer; - } - - //---- org.apache.qpid.nclient.MessagePartListener API - /** - * Deliver a message to the listener. - * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message) - { - try - { - // tell the session that a message is in process - _consumer.getSession().preProcessMessage((QpidMessage) message); - //TODO build the JMS message form a qpid message - AbstractJMSMessage jmsMessage = null; - // If the session is transacted we need to ack the message first - // This is because a message is associated with its tx only when acked - if (_consumer.getSession().getTransacted()) - { - _consumer.getSession().acknowledgeMessage(jmsMessage); - } - // The JMS specs says: - /* The result of a listener throwing a RuntimeException depends on the session�s - * acknowledgment mode. - � --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message - * will be immediately redelivered. The number of times a JMS provider will - * redeliver the same message before giving up is provider-dependent. - � --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. - * --- Transacted Session - the next message for the listener is delivered. - * - * The number of time we try redelivering the message is 0 - **/ - try - { - _jmsMessageLstener.onMessage(jmsMessage); - } - catch (RuntimeException re) - { - // do nothing as this message will not be redelivered - } - // If the session has been recovered we then need to redelivered this message - if (_consumer.getSession().isInRecovery()) - { - //message.release(); - } - // Tell the jms Session to ack this message if required - else if (!_consumer.getSession().getTransacted()) - { - _consumer.getSession().acknowledgeMessage(jmsMessage); - } - } - catch (Exception e) - { - throw new RuntimeException(e.getMessage()); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java new file mode 100644 index 0000000000..ece0905b0f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java @@ -0,0 +1,82 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.jms.message.QpidMessage; +import org.apache.qpidity.api.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session. + * This is for guarantying that asynch messages are sequentially processed within their session. + * <p> when used synchonously, messages are dispatched to the receiver itself. + */ +public class QpidMessageListener implements MessageListener +{ + /** + * Used for debugging. + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * This message listener consumer + */ + MessageConsumerImpl _consumer = null; + + //---- constructor + /** + * Create a message listener wrapper for a given consumer + * + * @param consumer The consumer of this listener + */ + public QpidMessageListener(MessageConsumerImpl consumer) + { + _consumer = consumer; + } + + //---- org.apache.qpid.nclient.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + try + { + //convert this message into a JMS one + QpidMessage jmsMessage = null; // todo + // if consumer is asynchronous then send this message to its session. + if( _consumer.getMessageListener() != null ) + { + _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); + } + else + { + // deliver this message to the consumer itself + _consumer.onMessage(jmsMessage); + } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java index d45f2b54f1..3c57ee727f 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java @@ -33,9 +33,9 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei * @param session The session from which the QueueReceiverImpl is instantiated. * @param queue The default queue for this QueueReceiverImpl. * @param messageSelector the message selector for this QueueReceiverImpl. - * @throws JMSException If the QueueReceiverImpl cannot be created due to some internal error. + * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error. */ - protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException + protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception { super(session, (DestinationImpl) queue, messageSelector, false, null); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java index ed97145c04..89a4d963eb 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java @@ -123,7 +123,16 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession { checkNotClosed(); checkDestination(queue); - return new QueueReceiverImpl(this, queue, messageSelector); + QueueReceiver receiver; + try + { + receiver = new QueueReceiverImpl(this, queue, messageSelector); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return receiver; } /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index 1aca96582f..22136814a1 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.nclient.jms.message.*; import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; import javax.jms.*; import javax.jms.IllegalStateException; @@ -28,9 +29,9 @@ import javax.jms.Session; import javax.jms.Message; import javax.jms.MessageListener; import java.io.Serializable; -import java.util.Vector; import java.util.LinkedList; import java.util.HashMap; +import java.util.ArrayList; /** * Implementation of the JMS Session interface @@ -43,43 +44,41 @@ public class SessionImpl implements Session private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); /** - * A queue for incoming messages including synch and asych messages. + * A queue for incoming asynch messages. */ - private LinkedList<QpidMessage> _incomingAsynchronousMessages = new LinkedList<QpidMessage>(); + private final LinkedList<IncomingMessage> _incomingAsynchronousMessages = new LinkedList<IncomingMessage>(); - //--- Session thread locking + //--- MessageDispatcherThread and Session locking /** - * indicates that the sessionThread has stopped + * indicates that the MessageDispatcherThread has stopped */ private boolean _hasStopped = false; /** - * lock for the sessionThread to wait until the session is stopped + * lock for the MessageDispatcherThread to wait until the session is stopped */ - private Object _stoppingLock = new Object(); + private final Object _stoppingLock = new Object(); /** - * lock for the stopper thread to wait on when the sessionThread is stopping + * lock for the stopper thread to wait on when the MessageDispatcherThread is stopping */ - private Object _stoppingJoin = new Object(); + private final Object _stoppingJoin = new Object(); /** * thread to dispatch messages to async consumers */ private MessageDispatcherThread _messageDispatcherThread = null; - + //----END /** * The messageActors of this session. */ - private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>(); + private final HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>(); /** * All the not yet acknoledged messages - * We use a vector as access to this list has to be synchronised - * This is because messages are acked from messagelistner threads */ - private Vector<QpidMessage> _unacknowledgedMessages = new Vector<QpidMessage>(); + private final ArrayList<QpidMessage> _unacknowledgedMessages = new ArrayList<QpidMessage>(); /** * Indicates whether this session is closed. @@ -156,10 +155,8 @@ public class SessionImpl implements Session { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - // Create and start a MessageDispatcherThread - // This thread is dispatching messages to the async consumers - _messageDispatcherThread = new MessageDispatcherThread(); - _messageDispatcherThread.start(); + // init the message dispatcher. + initMessageDispatcherThread(); } //--- javax.jms.Session API @@ -379,10 +376,9 @@ public class SessionImpl implements Session // that will stop the sessionThread if (_isStopped) { - start(); + startDispatchThread(); } - - //stop the sessionThread + //notify the sessionThread synchronized (_incomingAsynchronousMessages) { _incomingAsynchronousMessages.notifyAll(); @@ -401,8 +397,9 @@ public class SessionImpl implements Session // from now all the session methods will throw a IllegalStateException _isClosed = true; // close all the actors - closeAllActors(); + closeAllMessageActors(); _messageActors.clear(); + // We may have a thread trying to add a message synchronized (_incomingAsynchronousMessages) { _incomingAsynchronousMessages.clear(); @@ -417,7 +414,6 @@ public class SessionImpl implements Session { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - } } @@ -451,8 +447,17 @@ public class SessionImpl implements Session // release all unack messages for (QpidMessage message : _unacknowledgedMessages) { - // release all those messages - //Todo: message.getQpidMEssage.release(); + // release this message + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageRelease(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // TODO We can be a little bit cleverer and build a set of ranges } } @@ -559,12 +564,20 @@ public class SessionImpl implements Session * @throws InvalidDestinationException If an invalid destination is specified. * @throws InvalidSelectorException If the message selector is invalid. */ - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws - JMSException + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException { checkNotClosed(); checkDestination(destination); - MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + MessageConsumerImpl consumer = null; + try + { + consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // register this actor with the session _messageActors.put(consumer.getMessageActorID(), consumer); return consumer; @@ -647,12 +660,21 @@ public class SessionImpl implements Session * @throws InvalidDestinationException If an invalid topic is specified. * @throws InvalidSelectorException If the message selector is invalid. */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, - boolean noLocal) throws JMSException + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException { checkNotClosed(); checkDestination(topic); - TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name); + TopicSubscriberImpl subscriber; + try + { + subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, + _connection.getClientID() + ":" + name); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } _messageActors.put(subscriber.getMessageActorID(), subscriber); return subscriber; } @@ -739,43 +761,71 @@ public class SessionImpl implements Session * Remove a message actor form this session * <p> This method is called when an actor is independently closed. * - * @param actor The closed actor. + * @param messageActor The closed actor. + */ + protected void closeMessageActor(MessageActor messageActor) + { + _messageActors.remove(messageActor.getMessageActorID()); + } + + /** + * Idincates whether this session is stopped. + * + * @return True is this session is stopped, false otherwise. */ - protected void closeMessageActor(MessageActor actor) + protected boolean isStopped() { - _messageActors.remove(actor); + return _isStopped; } /** * Start the flow of message to this session. * - * @throws JMSException If starting the session fails due to some communication error. + * @throws Exception If starting the session fails due to some communication error. */ - protected void start() throws JMSException + protected synchronized void start() throws Exception { if (_isStopped) { - synchronized (_stoppingLock) + // start all the MessageActors + for (MessageActor messageActor : _messageActors.values()) { - _isStopped = false; - _stoppingLock.notify(); - } - synchronized (_stoppingJoin) - { - _hasStopped = false; + messageActor.start(); } + startDispatchThread(); + } + } + + /** + * Restart delivery of asynch messages + */ + private void startDispatchThread() + { + synchronized (_stoppingLock) + { + _isStopped = false; + _stoppingLock.notify(); + } + synchronized (_stoppingJoin) + { + _hasStopped = false; } } /** * Stop the flow of message to this session. * - * @throws JMSException If stopping the session fails due to some communication error. + * @throws Exception If stopping the session fails due to some communication error. */ - protected void stop() throws JMSException + protected synchronized void stop() throws Exception { if (!_isClosing && !_isStopped) { + // stop all the MessageActors + for (MessageActor messageActor : _messageActors.values()) + { + messageActor.stop(); + } synchronized (_incomingAsynchronousMessages) { _isStopped = true; @@ -811,6 +861,21 @@ public class SessionImpl implements Session } /** + * Dispatch this message to this session asynchronous consumers + * + * @param consumerID The consumer ID. + * @param message The message to be dispatched. + */ + public void dispatchMessage(String consumerID, QpidMessage message) + { + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message)); + _incomingAsynchronousMessages.notifyAll(); + } + } + + /** * Indicate whether this session is recovering . * * @return true if this session is recovering. @@ -850,7 +915,8 @@ public class SessionImpl implements Session { if (dest == null) { - throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest, "Invalid destination"); + throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest, + "Invalid destination"); } } @@ -869,14 +935,25 @@ public class SessionImpl implements Session { // messages will be acknowldeged by the client application. // store this message for acknowledging it afterward - _unacknowledgedMessages.add(message); + synchronized (_unacknowledgedMessages) + { + _unacknowledgedMessages.add(message); + } } else { // acknowledge this message - // TODO: message.acknowledgeQpidMessge(); + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageAcknowledge(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } - //TODO: Implement DUPS OK heuristic + //tobedone: Implement DUPS OK heuristic } /** @@ -895,13 +972,25 @@ public class SessionImpl implements Session checkNotClosed(); if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - for (QpidMessage message : _unacknowledgedMessages) + synchronized (_unacknowledgedMessages) { - // acknowledge this message - // TODO: message.acknowledgeQpidMessge(); + for (QpidMessage message : _unacknowledgedMessages) + { + // acknowledge this message + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageAcknowledge(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // TODO We can be a little bit cleverer and build a set of ranges + } + //empty the list of unack messages + _unacknowledgedMessages.clear(); } - //empty the list of unack messages - _unacknowledgedMessages.clear(); } //else there is no effect } @@ -916,13 +1005,23 @@ public class SessionImpl implements Session return _qpidSession; } + /** + * Get this session's conneciton + * + * @return This session's connection + */ + protected ConnectionImpl getConnection() + { + return _connection; + } + //------ Private Methods /** * Close the producer and the consumers of this session * * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error. */ - private void closeAllActors() throws JMSException + private void closeAllMessageActors() throws JMSException { for (MessageActor messageActor : _messageActors.values()) { @@ -930,9 +1029,66 @@ public class SessionImpl implements Session } } + /** + * create and start the MessageDispatcherThread. + */ + private synchronized void initMessageDispatcherThread() + { + // Create and start a MessageDispatcherThread + // This thread is dispatching messages to the async consumers + _messageDispatcherThread = new MessageDispatcherThread(); + _messageDispatcherThread.start(); + } + //------ Inner classes /** + * Convenient class for storing incoming messages associated with a consumer ID. + * <p> Those messages are enqueued in _incomingAsynchronousMessages + */ + private class IncomingMessage + { + // The consumer ID + private String _consumerId; + // The message + private QpidMessage _message; + + //-- constructor + /** + * Creat a new incoming message + * + * @param consumerId The consumer ID + * @param message The message to be delivered + */ + IncomingMessage(String consumerId, QpidMessage message) + { + _consumerId = consumerId; + _message = message; + } + + // Getters + /** + * Get the consumer ID + * + * @return The consumer ID for this message + */ + public String getConsumerId() + { + return _consumerId; + } + + /** + * Get the message. + * + * @return The message. + */ + public QpidMessage getMessage() + { + return _message; + } + } + + /** * A MessageDispatcherThread is attached to every SessionImpl. * <p/> * This thread is responsible for removing messages from m_incomingMessages and @@ -957,9 +1113,8 @@ public class SessionImpl implements Session */ public void run() { - QpidMessage message = null; - - // deliver messages to consumers until the stop flag is set. + IncomingMessage message = null; + // deliver messages to asynchronous consumers until the stop flag is set. do { // When this session is not closing and and stopped @@ -1012,19 +1167,19 @@ public class SessionImpl implements Session MessageConsumerImpl mc; synchronized (_messageActors) { - mc = null; // todo _messageActors.get(message.consumerID); + mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId()); } - boolean consumed = false; if (mc != null) { try { - // todo call onMessage + mc.onMessage(message.getMessage()); } catch (RuntimeException t) { // the JMS specification tells us to flag that to the client! - _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t); + _logger.error( + "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t); } } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java index 8537fd1268..39744697a0 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java @@ -139,6 +139,15 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession { checkNotClosed(); checkDestination(topic); - return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); - } + TopicSubscriber topicSubscriber; + try + { + topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return topicSubscriber; + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java index 23007a5839..1dfee37b9b 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java @@ -36,10 +36,10 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub * @param noLocal If true inhibits the delivery of messages published by its own connection. * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. * If this value is null, a non-durable subscription is created. - * @throws javax.jms.JMSException If the TopicSubscriberImpl cannot be created due to internal error. + * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error. */ protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal, - String subscriptionName) throws JMSException + String subscriptionName) throws Exception { super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java index 92a32f9bf4..02a43b0414 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.nclient.jms.message; import javax.jms.JMSException; +import javax.jms.Message; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.ContentHeaderProperties; @@ -132,4 +133,18 @@ public class QpidMessage { return getPropertyHeaders().getTimestamp(propertyName); } + + public Message getJMSMessage() + { + // todo + return null; + } + + public Long getMessageID() + { + //todo + return new Long(1); + } } + + |
