diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-03 13:45:41 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-03 13:45:41 +0000 |
| commit | 079c3e7ab67ad0c0ecb747ff36055ce97b28876e (patch) | |
| tree | 6fe0f3e7536bbdadb19989beb6c6fa34130ae934 /java | |
| parent | 6167ee934ff21684a93f43b5efcf47a85f1e4aa2 (diff) | |
| download | qpid-python-079c3e7ab67ad0c0ecb747ff36055ce97b28876e.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java | 38 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java) | 18 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/Session.java | 8 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java | 6 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java | 33 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java | 33 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java | 8 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java | 6 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java | 142 |
9 files changed, 197 insertions, 95 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java deleted file mode 100644 index 93b770a285..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpidity.api.Message; - -/** - * MessageListeners are used to asynchronously receive messages. - */ -public interface MessageListener -{ - /** - * <p>Transfer a message to the listener. - * You will be notified when the whole message is received - * However, underneath the message might be streamed off disk - * or network buffers. - * </p> - * - * @param message The message delivered to the listner. - */ - public void messageTransfer(Message message); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java index 20a8319409..86e841d5a7 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java @@ -20,20 +20,17 @@ package org.apache.qpid.nclient; import org.apache.qpidity.Header; /** - * <p>This message listener is useful if you need to - * know when each message part becomes available - * as opposed to knowing when the whole message arrives.</p> - * <p/> + * Assembles message parts. * <p> The sequence of event for transferring a message is as follows: * <ul> - * <li> n calls to addMessageHeaders (should be usually one or two) + * <li> messageHeaders * <li> n calls to addData - * <li> {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(<code>null</code>). + * <li> messageReceived * </ul> * This is up to the implementation to assembled the message when the different parts * are transferred. */ -public interface StreamingMessageListener extends MessageListener +public interface MessagePartListener { /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} @@ -41,7 +38,7 @@ public interface StreamingMessageListener extends MessageListener * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> */ - public void addMessageHeaders(Header... headers); + public void messageHeaders(Header... headers); /** * Add the following byte array to the content of the message being received @@ -50,4 +47,9 @@ public interface StreamingMessageListener extends MessageListener */ public void addData(byte[] data); + /** + * Indicates that the message has been fully received. + */ + public void messageReceived(); + } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index 9a2b5e63bf..566280ba37 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -168,12 +168,12 @@ public interface Session * @param queue The queue this receiver is receiving messages from. * @param destination The destination for the subscriber ,a.k.a the delivery tag. * @param listener The listener for this destination. When big message are transfered then - * it is recommended to use a {@link StreamingMessageListener}. + * it is recommended to use a {@link MessagePartListener}. * @param options Set of Options. * @param filter The filters to apply to consumed messages. * @throws QpidException If the session fails to create the receiver due to some error. */ - public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter, + public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter, Option... options) throws QpidException; /** @@ -192,9 +192,9 @@ public interface Session * * @param destination The destination the listener is associated with. * @param listener The new listener for this destination. When big message are transfered then - * it is recommended to use a {@link StreamingMessageListener}. + * it is recommended to use a {@link MessagePartListener}. */ - public void setMessageListener(String destination, MessageListener listener); + public void setMessageListener(String destination, MessagePartListener listener); /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index 099f7ed694..714601032a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -3,7 +3,7 @@ package org.apache.qpid.nclient.impl; import java.util.Map; import org.apache.qpidity.api.Message; -import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpidity.*; /** @@ -57,7 +57,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session //To change body of implemented methods use File | Settings | File Templates. } - public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter, + public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter, Option... options) throws QpidException { //To change body of implemented methods use File | Settings | File Templates. @@ -68,7 +68,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session //To change body of implemented methods use File | Settings | File Templates. } - public void setMessageListener(String destination, MessageListener listener) + public void setMessageListener(String destination, MessagePartListener listener) { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..a4167e60cf --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java @@ -0,0 +1,33 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpid.nclient.MessageListener; +import org.apache.qpidity.Header; +import org.apache.qpidity.api.Message; + +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + Message _currentMsg; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + _currentMsg = null; + } + + public void addData(byte[] src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Header... headers) + { + //_currentMsg add the headers + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java deleted file mode 100644 index ff0ac63540..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.qpid.nclient.impl; - -import org.apache.qpid.nclient.MessageListener; -import org.apache.qpid.nclient.StreamingMessageListener; -import org.apache.qpidity.Header; -import org.apache.qpidity.Option; -import org.apache.qpidity.api.Message; - -public class StreamingListenerAdapter implements StreamingMessageListener -{ - MessageListener _adaptee; - Message _currentMsg; - - public StreamingListenerAdapter(MessageListener l) - { - _adaptee = l; - } - - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } - - public void addMessageHeaders(Header... headers) - { - //_currentMsg add the headers - } - - public void messageTransfer(Message message) - { - _adaptee.messageTransfer(_currentMsg); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 3689eb60b0..be23e34529 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -18,8 +18,6 @@ package org.apache.qpid.nclient.jms; //import org.apache.qpid.nclient.api.MessageReceiver; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -50,7 +48,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer protected String _subscriptionName; /** - * A MessageListener set up for this consumer. + * A MessagePartListener set up for this consumer. */ private MessageListener _messageListener; @@ -107,7 +105,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } /** - * Gets this MessageConsumer's <CODE>MessageListener</CODE>. + * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>. * * @return The listener for the MessageConsumer, or null if no listener is set * @throws JMSException if getting the message listener fails due to some internal error. @@ -119,7 +117,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } /** - * Sets the MessageConsumer's <CODE>MessageListener</CODE>. + * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>. * <p> The JMS specification says: * <P>Setting the message listener to null is the equivalent of * unsetting the message listener for the message consumer. diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java index 4dde127337..003e580007 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java @@ -17,7 +17,7 @@ */ package org.apache.qpid.nclient.jms; -import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpid.nclient.jms.message.AbstractJMSMessage; import org.apache.qpid.nclient.jms.message.QpidMessage; import org.apache.qpidity.api.Message; @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; /** * This is a wrapper for the JMS message listener */ -public class MessageListenerWrapper implements MessageListener +public class MessageListenerWrapper implements MessagePartListener { /** * Used for debugging. @@ -55,7 +55,7 @@ public class MessageListenerWrapper implements MessageListener _consumer = consumer; } - //---- org.apache.qpid.nclient.MessageListener API + //---- org.apache.qpid.nclient.MessagePartListener API /** * Deliver a message to the listener. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index d0c6569197..ad682d9e70 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -30,6 +30,7 @@ import javax.jms.MessageListener; import java.io.Serializable; import java.util.ArrayList; import java.util.Vector; +import java.util.LinkedList; /** * Implementation of the JMS Session interface @@ -42,6 +43,28 @@ public class SessionImpl implements Session private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); /** + * A queue for incoming messages including synch and asych messages. + */ + private LinkedList<QpidMessage> _incomingAsynchronousMessages = new LinkedList<QpidMessage>(); + + //--- Session thread locking + /** + * indicates that the sessionThread has stopped + */ + private boolean _hasStopped = false; + + /** + * lock for the sessionThread to wiat on when the session is stopped + */ + private Object _stoppingLock = new Object(); + + /** + * lock for the stopper thread to wait on when the sessionThread is stopping + */ + private Object _stoppingJoin = new Object(); + + + /** * The messageActors of this session. */ private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>(); @@ -59,6 +82,16 @@ public class SessionImpl implements Session private boolean _isClosed = false; /** + * Indicates whether this session is closing. + */ + private boolean _isClosing = false; + + /** + * Indicates whether this session is stopped. + */ + private boolean _isStopped = false; + + /** * Used to indicate whether or not this is a transactional session. */ private boolean _transacted; @@ -687,7 +720,7 @@ public class SessionImpl implements Session */ protected void stop() throws JMSException { - // TODO: make sure that the correct options are used + // TODO: make sure that the correct options are used } /** @@ -819,4 +852,111 @@ public class SessionImpl implements Session messageActor.closeMessageActor(); } } + + //------ Inner classes + + /** + * A MessageDispatcherThread is attached to every SessionImpl. + * <p/> + * This thread is responsible for removing messages from m_incomingMessages and + * dispatching them to the appropriate MessageConsumer. + * <p> Messages have to be dispatched serially. + * + * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}". + */ + private class MessageDispatcherThread extends Thread + { + //--- Constructor + /** + * Create a Deamon thread for dispatching messages to this session listeners. + */ + MessageDispatcherThread() + { + super("MessageDispatcher"); + // this thread is Deamon + setDaemon(true); + } + + /** + * Use to run this thread. + */ + public void run() + { + QpidMessage message = null; + + // deliver messages to consumers until the stop flag is set. + do + { + // When this session is not closing and and stopped + // then this thread needs to wait until messages are delivered. + synchronized (_incomingAsynchronousMessages) + { + while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty()) + { + try + { + _incomingAsynchronousMessages.wait(); + } + catch (InterruptedException ie) + { + /* ignore */ + } + } + } + // If this session is stopped then we need to wait on the stoppingLock + synchronized (_stoppingLock) + { + try + { + while (_isStopped) + { + // if the session is stopped we have to notify the stopper thread + synchronized (_stoppingJoin) + { + _hasStopped = true; + _stoppingJoin.notify(); + } + _stoppingLock.wait(); + } + } + catch (Exception ie) + { + /* ignore */ + } + } + synchronized (_incomingAsynchronousMessages) + { + if (!_isClosing && !_incomingAsynchronousMessages.isEmpty()) + { + message = _incomingAsynchronousMessages.getFirst(); + } + } + + /* if (message != null) + { + MessageConsumerImpl mc; + synchronized (_actors) + { + mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID); + } + boolean consumed = false; + if (mc != null) + { + try + { + consumed = mc.onMessage(actorMessage.genericMessage); + } + catch (RuntimeException t) + { + // the JMS specification tells us to flag that to the client! + log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t); + } + } + } */ + message = null; + } + while (!_isClosing); // repeat as long as this session is not closing + } + } + } |
