summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-03 13:45:41 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-03 13:45:41 +0000
commit079c3e7ab67ad0c0ecb747ff36055ce97b28876e (patch)
tree6fe0f3e7536bbdadb19989beb6c6fa34130ae934 /java
parent6167ee934ff21684a93f43b5efcf47a85f1e4aa2 (diff)
downloadqpid-python-079c3e7ab67ad0c0ecb747ff36055ce97b28876e.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java)18
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Session.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java142
9 files changed, 197 insertions, 95 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
deleted file mode 100644
index 93b770a285..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpidity.api.Message;
-
-/**
- * MessageListeners are used to asynchronously receive messages.
- */
-public interface MessageListener
-{
- /**
- * <p>Transfer a message to the listener.
- * You will be notified when the whole message is received
- * However, underneath the message might be streamed off disk
- * or network buffers.
- * </p>
- *
- * @param message The message delivered to the listner.
- */
- public void messageTransfer(Message message);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
index 20a8319409..86e841d5a7 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
@@ -20,20 +20,17 @@ package org.apache.qpid.nclient;
import org.apache.qpidity.Header;
/**
- * <p>This message listener is useful if you need to
- * know when each message part becomes available
- * as opposed to knowing when the whole message arrives.</p>
- * <p/>
+ * Assembles message parts.
* <p> The sequence of event for transferring a message is as follows:
* <ul>
- * <li> n calls to addMessageHeaders (should be usually one or two)
+ * <li> messageHeaders
* <li> n calls to addData
- * <li> {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(<code>null</code>).
+ * <li> messageReceived
* </ul>
* This is up to the implementation to assembled the message when the different parts
* are transferred.
*/
-public interface StreamingMessageListener extends MessageListener
+public interface MessagePartListener
{
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
@@ -41,7 +38,7 @@ public interface StreamingMessageListener extends MessageListener
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void addMessageHeaders(Header... headers);
+ public void messageHeaders(Header... headers);
/**
* Add the following byte array to the content of the message being received
@@ -50,4 +47,9 @@ public interface StreamingMessageListener extends MessageListener
*/
public void addData(byte[] data);
+ /**
+ * Indicates that the message has been fully received.
+ */
+ public void messageReceived();
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
index 9a2b5e63bf..566280ba37 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
@@ -168,12 +168,12 @@ public interface Session
* @param queue The queue this receiver is receiving messages from.
* @param destination The destination for the subscriber ,a.k.a the delivery tag.
* @param listener The listener for this destination. When big message are transfered then
- * it is recommended to use a {@link StreamingMessageListener}.
+ * it is recommended to use a {@link MessagePartListener}.
* @param options Set of Options.
* @param filter The filters to apply to consumed messages.
* @throws QpidException If the session fails to create the receiver due to some error.
*/
- public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter,
Option... options) throws QpidException;
/**
@@ -192,9 +192,9 @@ public interface Session
*
* @param destination The destination the listener is associated with.
* @param listener The new listener for this destination. When big message are transfered then
- * it is recommended to use a {@link StreamingMessageListener}.
+ * it is recommended to use a {@link MessagePartListener}.
*/
- public void setMessageListener(String destination, MessageListener listener);
+ public void setMessageListener(String destination, MessagePartListener listener);
/**
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index 099f7ed694..714601032a 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -3,7 +3,7 @@ package org.apache.qpid.nclient.impl;
import java.util.Map;
import org.apache.qpidity.api.Message;
-import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpidity.*;
/**
@@ -57,7 +57,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session
//To change body of implemented methods use File | Settings | File Templates.
}
- public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter,
Option... options) throws QpidException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -68,7 +68,7 @@ public class ClientSession implements org.apache.qpid.nclient.Session
//To change body of implemented methods use File | Settings | File Templates.
}
- public void setMessageListener(String destination, MessageListener listener)
+ public void setMessageListener(String destination, MessagePartListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java
new file mode 100644
index 0000000000..a4167e60cf
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/MessagePartListenerAdapter.java
@@ -0,0 +1,33 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpidity.Header;
+import org.apache.qpidity.api.Message;
+
+public class MessagePartListenerAdapter implements MessagePartListener
+{
+ MessageListener _adaptee;
+ Message _currentMsg;
+
+ public MessagePartListenerAdapter(MessageListener listener)
+ {
+ _adaptee = listener;
+ _currentMsg = null;
+ }
+
+ public void addData(byte[] src)
+ {
+ _currentMsg.appendData(src);
+ }
+
+ public void messageHeaders(Header... headers)
+ {
+ //_currentMsg add the headers
+ }
+
+ public void messageReceived()
+ {
+ _adaptee.onMessage(_currentMsg);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
deleted file mode 100644
index ff0ac63540..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.qpid.nclient.impl;
-
-import org.apache.qpid.nclient.MessageListener;
-import org.apache.qpid.nclient.StreamingMessageListener;
-import org.apache.qpidity.Header;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.api.Message;
-
-public class StreamingListenerAdapter implements StreamingMessageListener
-{
- MessageListener _adaptee;
- Message _currentMsg;
-
- public StreamingListenerAdapter(MessageListener l)
- {
- _adaptee = l;
- }
-
- public void addData(byte[] src)
- {
- _currentMsg.appendData(src);
- }
-
- public void addMessageHeaders(Header... headers)
- {
- //_currentMsg add the headers
- }
-
- public void messageTransfer(Message message)
- {
- _adaptee.messageTransfer(_currentMsg);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
index 3689eb60b0..be23e34529 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
@@ -18,8 +18,6 @@
package org.apache.qpid.nclient.jms;
//import org.apache.qpid.nclient.api.MessageReceiver;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -50,7 +48,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
protected String _subscriptionName;
/**
- * A MessageListener set up for this consumer.
+ * A MessagePartListener set up for this consumer.
*/
private MessageListener _messageListener;
@@ -107,7 +105,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
/**
- * Gets this MessageConsumer's <CODE>MessageListener</CODE>.
+ * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>.
*
* @return The listener for the MessageConsumer, or null if no listener is set
* @throws JMSException if getting the message listener fails due to some internal error.
@@ -119,7 +117,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
/**
- * Sets the MessageConsumer's <CODE>MessageListener</CODE>.
+ * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>.
* <p> The JMS specification says:
* <P>Setting the message listener to null is the equivalent of
* unsetting the message listener for the message consumer.
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
index 4dde127337..003e580007 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
@@ -17,7 +17,7 @@
*/
package org.apache.qpid.nclient.jms;
-import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpid.nclient.jms.message.AbstractJMSMessage;
import org.apache.qpid.nclient.jms.message.QpidMessage;
import org.apache.qpidity.api.Message;
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
/**
* This is a wrapper for the JMS message listener
*/
-public class MessageListenerWrapper implements MessageListener
+public class MessageListenerWrapper implements MessagePartListener
{
/**
* Used for debugging.
@@ -55,7 +55,7 @@ public class MessageListenerWrapper implements MessageListener
_consumer = consumer;
}
- //---- org.apache.qpid.nclient.MessageListener API
+ //---- org.apache.qpid.nclient.MessagePartListener API
/**
* Deliver a message to the listener.
*
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index d0c6569197..ad682d9e70 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -30,6 +30,7 @@ import javax.jms.MessageListener;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Vector;
+import java.util.LinkedList;
/**
* Implementation of the JMS Session interface
@@ -42,6 +43,28 @@ public class SessionImpl implements Session
private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
/**
+ * A queue for incoming messages including synch and asych messages.
+ */
+ private LinkedList<QpidMessage> _incomingAsynchronousMessages = new LinkedList<QpidMessage>();
+
+ //--- Session thread locking
+ /**
+ * indicates that the sessionThread has stopped
+ */
+ private boolean _hasStopped = false;
+
+ /**
+ * lock for the sessionThread to wiat on when the session is stopped
+ */
+ private Object _stoppingLock = new Object();
+
+ /**
+ * lock for the stopper thread to wait on when the sessionThread is stopping
+ */
+ private Object _stoppingJoin = new Object();
+
+
+ /**
* The messageActors of this session.
*/
private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>();
@@ -59,6 +82,16 @@ public class SessionImpl implements Session
private boolean _isClosed = false;
/**
+ * Indicates whether this session is closing.
+ */
+ private boolean _isClosing = false;
+
+ /**
+ * Indicates whether this session is stopped.
+ */
+ private boolean _isStopped = false;
+
+ /**
* Used to indicate whether or not this is a transactional session.
*/
private boolean _transacted;
@@ -687,7 +720,7 @@ public class SessionImpl implements Session
*/
protected void stop() throws JMSException
{
- // TODO: make sure that the correct options are used
+ // TODO: make sure that the correct options are used
}
/**
@@ -819,4 +852,111 @@ public class SessionImpl implements Session
messageActor.closeMessageActor();
}
}
+
+ //------ Inner classes
+
+ /**
+ * A MessageDispatcherThread is attached to every SessionImpl.
+ * <p/>
+ * This thread is responsible for removing messages from m_incomingMessages and
+ * dispatching them to the appropriate MessageConsumer.
+ * <p> Messages have to be dispatched serially.
+ *
+ * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}".
+ */
+ private class MessageDispatcherThread extends Thread
+ {
+ //--- Constructor
+ /**
+ * Create a Deamon thread for dispatching messages to this session listeners.
+ */
+ MessageDispatcherThread()
+ {
+ super("MessageDispatcher");
+ // this thread is Deamon
+ setDaemon(true);
+ }
+
+ /**
+ * Use to run this thread.
+ */
+ public void run()
+ {
+ QpidMessage message = null;
+
+ // deliver messages to consumers until the stop flag is set.
+ do
+ {
+ // When this session is not closing and and stopped
+ // then this thread needs to wait until messages are delivered.
+ synchronized (_incomingAsynchronousMessages)
+ {
+ while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty())
+ {
+ try
+ {
+ _incomingAsynchronousMessages.wait();
+ }
+ catch (InterruptedException ie)
+ {
+ /* ignore */
+ }
+ }
+ }
+ // If this session is stopped then we need to wait on the stoppingLock
+ synchronized (_stoppingLock)
+ {
+ try
+ {
+ while (_isStopped)
+ {
+ // if the session is stopped we have to notify the stopper thread
+ synchronized (_stoppingJoin)
+ {
+ _hasStopped = true;
+ _stoppingJoin.notify();
+ }
+ _stoppingLock.wait();
+ }
+ }
+ catch (Exception ie)
+ {
+ /* ignore */
+ }
+ }
+ synchronized (_incomingAsynchronousMessages)
+ {
+ if (!_isClosing && !_incomingAsynchronousMessages.isEmpty())
+ {
+ message = _incomingAsynchronousMessages.getFirst();
+ }
+ }
+
+ /* if (message != null)
+ {
+ MessageConsumerImpl mc;
+ synchronized (_actors)
+ {
+ mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID);
+ }
+ boolean consumed = false;
+ if (mc != null)
+ {
+ try
+ {
+ consumed = mc.onMessage(actorMessage.genericMessage);
+ }
+ catch (RuntimeException t)
+ {
+ // the JMS specification tells us to flag that to the client!
+ log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
+ }
+ }
+ } */
+ message = null;
+ }
+ while (!_isClosing); // repeat as long as this session is not closing
+ }
+ }
+
}