From 079c3e7ab67ad0c0ecb747ff36055ce97b28876e Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 3 Aug 2007 13:45:41 +0000 Subject: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562463 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/nclient/MessageListener.java | 38 ------ .../apache/qpid/nclient/MessagePartListener.java | 55 ++++++++ .../main/java/org/apache/qpid/nclient/Session.java | 8 +- .../qpid/nclient/StreamingMessageListener.java | 53 -------- .../apache/qpid/nclient/impl/ClientSession.java | 6 +- .../nclient/impl/MessagePartListenerAdapter.java | 33 +++++ .../nclient/impl/StreamingListenerAdapter.java | 33 ----- .../qpid/nclient/jms/MessageConsumerImpl.java | 8 +- .../qpid/nclient/jms/MessageListenerWrapper.java | 6 +- .../org/apache/qpid/nclient/jms/SessionImpl.java | 142 ++++++++++++++++++++- 10 files changed, 242 insertions(+), 140 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java deleted file mode 100644 index 93b770a285..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpidity.api.Message; - -/** - * MessageListeners are used to asynchronously receive messages. - */ -public interface MessageListener -{ - /** - *

Transfer a message to the listener. - * You will be notified when the whole message is received - * However, underneath the message might be streamed off disk - * or network buffers. - *

- * - * @param message The message delivered to the listner. - */ - public void messageTransfer(Message message); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java new file mode 100644 index 0000000000..86e841d5a7 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpidity.Header; + +/** + * Assembles message parts. + *

The sequence of event for transferring a message is as follows: + *

+ * This is up to the implementation to assembled the message when the different parts + * are transferred. + */ +public interface MessagePartListener +{ + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. + * + * @param headers Either DeliveryProperties or ApplicationProperties + */ + public void messageHeaders(Header... headers); + + /** + * Add the following byte array to the content of the message being received + * + * @param data Data to be added or streamed. + */ + public void addData(byte[] data); + + /** + * Indicates that the message has been fully received. + */ + public void messageReceived(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index 9a2b5e63bf..566280ba37 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -168,12 +168,12 @@ public interface Session * @param queue The queue this receiver is receiving messages from. * @param destination The destination for the subscriber ,a.k.a the delivery tag. * @param listener The listener for this destination. When big message are transfered then - * it is recommended to use a {@link StreamingMessageListener}. + * it is recommended to use a {@link MessagePartListener}. * @param options Set of Options. * @param filter The filters to apply to consumed messages. * @throws QpidException If the session fails to create the receiver due to some error. */ - public void messageSubscribe(String queue, String destination, MessageListener listener, Map filter, + public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map filter, Option... options) throws QpidException; /** @@ -192,9 +192,9 @@ public interface Session * * @param destination The destination the listener is associated with. * @param listener The new listener for this destination. When big message are transfered then - * it is recommended to use a {@link StreamingMessageListener}. + * it is recommended to use a {@link MessagePartListener}. */ - public void setMessageListener(String destination, MessageListener listener); + public void setMessageListener(String destination, MessagePartListener listener); /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java deleted file mode 100644 index 20a8319409..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java +++ /dev/null @@ -1,53 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpidity.Header; - -/** - *

This message listener is useful if you need to - * know when each message part becomes available - * as opposed to knowing when the whole message arrives.

- *

- *

The sequence of event for transferring a message is as follows: - *

    - *
  • n calls to addMessageHeaders (should be usually one or two) - *
  • n calls to addData - *
  • {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(null). - *
- * This is up to the implementation to assembled the message when the different parts - * are transferred. - */ -public interface StreamingMessageListener extends MessageListener -{ - /** - * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} - * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. - * - * @param headers Either DeliveryProperties or ApplicationProperties - */ - public void addMessageHeaders(Header... headers); - - /** - * Add the following byte array to the content of the message being received - * - * @param data Data to be added or streamed. - */ - public void addData(byte[] data); - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index 099f7ed694..714601032a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -3,7 +3,7 @@ package org.apache.qpid.nclient.impl; import java.util.Map; import org.apache.qpidity.api.Message; -import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpidity.*; /** @@ -57,7 +57,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session //To change body of implemented methods use File | Settings | File Templates. } - public void messageSubscribe(String queue, String destination, MessageListener listener, Map filter, + public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map filter, Option... options) throws QpidException { //To change body of implemented methods use File | Settings | File Templates. @@ -68,7 +68,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session //To change body of implemented methods use File | Settings | File Templates. } - public void setMessageListener(String destination, MessageListener listener) + public void setMessageListener(String destination, MessagePartListener listener) { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..a4167e60cf --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java @@ -0,0 +1,33 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpid.nclient.MessageListener; +import org.apache.qpidity.Header; +import org.apache.qpidity.api.Message; + +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + Message _currentMsg; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + _currentMsg = null; + } + + public void addData(byte[] src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Header... headers) + { + //_currentMsg add the headers + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java deleted file mode 100644 index ff0ac63540..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.qpid.nclient.impl; - -import org.apache.qpid.nclient.MessageListener; -import org.apache.qpid.nclient.StreamingMessageListener; -import org.apache.qpidity.Header; -import org.apache.qpidity.Option; -import org.apache.qpidity.api.Message; - -public class StreamingListenerAdapter implements StreamingMessageListener -{ - MessageListener _adaptee; - Message _currentMsg; - - public StreamingListenerAdapter(MessageListener l) - { - _adaptee = l; - } - - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } - - public void addMessageHeaders(Header... headers) - { - //_currentMsg add the headers - } - - public void messageTransfer(Message message) - { - _adaptee.messageTransfer(_currentMsg); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 3689eb60b0..be23e34529 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -18,8 +18,6 @@ package org.apache.qpid.nclient.jms; //import org.apache.qpid.nclient.api.MessageReceiver; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -50,7 +48,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer protected String _subscriptionName; /** - * A MessageListener set up for this consumer. + * A MessagePartListener set up for this consumer. */ private MessageListener _messageListener; @@ -107,7 +105,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } /** - * Gets this MessageConsumer's MessageListener. + * Gets this MessageConsumer's MessagePartListener. * * @return The listener for the MessageConsumer, or null if no listener is set * @throws JMSException if getting the message listener fails due to some internal error. @@ -119,7 +117,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } /** - * Sets the MessageConsumer's MessageListener. + * Sets the MessageConsumer's MessagePartListener. *

The JMS specification says: *

Setting the message listener to null is the equivalent of * unsetting the message listener for the message consumer. diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java index 4dde127337..003e580007 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java @@ -17,7 +17,7 @@ */ package org.apache.qpid.nclient.jms; -import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpid.nclient.jms.message.AbstractJMSMessage; import org.apache.qpid.nclient.jms.message.QpidMessage; import org.apache.qpidity.api.Message; @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; /** * This is a wrapper for the JMS message listener */ -public class MessageListenerWrapper implements MessageListener +public class MessageListenerWrapper implements MessagePartListener { /** * Used for debugging. @@ -55,7 +55,7 @@ public class MessageListenerWrapper implements MessageListener _consumer = consumer; } - //---- org.apache.qpid.nclient.MessageListener API + //---- org.apache.qpid.nclient.MessagePartListener API /** * Deliver a message to the listener. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index d0c6569197..ad682d9e70 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -30,6 +30,7 @@ import javax.jms.MessageListener; import java.io.Serializable; import java.util.ArrayList; import java.util.Vector; +import java.util.LinkedList; /** * Implementation of the JMS Session interface @@ -41,6 +42,28 @@ public class SessionImpl implements Session */ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + /** + * A queue for incoming messages including synch and asych messages. + */ + private LinkedList _incomingAsynchronousMessages = new LinkedList(); + + //--- Session thread locking + /** + * indicates that the sessionThread has stopped + */ + private boolean _hasStopped = false; + + /** + * lock for the sessionThread to wiat on when the session is stopped + */ + private Object _stoppingLock = new Object(); + + /** + * lock for the stopper thread to wait on when the sessionThread is stopping + */ + private Object _stoppingJoin = new Object(); + + /** * The messageActors of this session. */ @@ -58,6 +81,16 @@ public class SessionImpl implements Session */ private boolean _isClosed = false; + /** + * Indicates whether this session is closing. + */ + private boolean _isClosing = false; + + /** + * Indicates whether this session is stopped. + */ + private boolean _isStopped = false; + /** * Used to indicate whether or not this is a transactional session. */ @@ -687,7 +720,7 @@ public class SessionImpl implements Session */ protected void stop() throws JMSException { - // TODO: make sure that the correct options are used + // TODO: make sure that the correct options are used } /** @@ -819,4 +852,111 @@ public class SessionImpl implements Session messageActor.closeMessageActor(); } } + + //------ Inner classes + + /** + * A MessageDispatcherThread is attached to every SessionImpl. + *

+ * This thread is responsible for removing messages from m_incomingMessages and + * dispatching them to the appropriate MessageConsumer. + *

Messages have to be dispatched serially. + * + * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}". + */ + private class MessageDispatcherThread extends Thread + { + //--- Constructor + /** + * Create a Deamon thread for dispatching messages to this session listeners. + */ + MessageDispatcherThread() + { + super("MessageDispatcher"); + // this thread is Deamon + setDaemon(true); + } + + /** + * Use to run this thread. + */ + public void run() + { + QpidMessage message = null; + + // deliver messages to consumers until the stop flag is set. + do + { + // When this session is not closing and and stopped + // then this thread needs to wait until messages are delivered. + synchronized (_incomingAsynchronousMessages) + { + while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty()) + { + try + { + _incomingAsynchronousMessages.wait(); + } + catch (InterruptedException ie) + { + /* ignore */ + } + } + } + // If this session is stopped then we need to wait on the stoppingLock + synchronized (_stoppingLock) + { + try + { + while (_isStopped) + { + // if the session is stopped we have to notify the stopper thread + synchronized (_stoppingJoin) + { + _hasStopped = true; + _stoppingJoin.notify(); + } + _stoppingLock.wait(); + } + } + catch (Exception ie) + { + /* ignore */ + } + } + synchronized (_incomingAsynchronousMessages) + { + if (!_isClosing && !_incomingAsynchronousMessages.isEmpty()) + { + message = _incomingAsynchronousMessages.getFirst(); + } + } + + /* if (message != null) + { + MessageConsumerImpl mc; + synchronized (_actors) + { + mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID); + } + boolean consumed = false; + if (mc != null) + { + try + { + consumed = mc.onMessage(actorMessage.genericMessage); + } + catch (RuntimeException t) + { + // the JMS specification tells us to flag that to the client! + log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t); + } + } + } */ + message = null; + } + while (!_isClosing); // repeat as long as this session is not closing + } + } + } -- cgit v1.2.1