summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-06 19:20:13 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-06 19:20:13 +0000
commit9c672d587e40572848e996c731db4b793a16e0ce (patch)
tree29e413c37b499aeacfab0ca553865d2e9cb8fe21 /java
parent088b7c8461979ee8df9480cb9105cc045fd632b6 (diff)
downloadqpid-python-9c672d587e40572848e996c731db4b793a16e0ce.tar.gz
Implemented queue browsing
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563226 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java70
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java189
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java13
4 files changed, 267 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 07f274381f..3fe32d4e07 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -348,7 +348,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// This indicate to the delivery thread to deliver the message to this consumer
// as it can happens that a message is delivered after a receive operation as returned.
_isReceiving = true;
- boolean received = false;
+ int received = 0;
if (!_isStopped)
{
// if this consumer is stopped then this will be call when starting
@@ -356,7 +356,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
.messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
received = getSession().getQpidSession().messageFlush(getMessageActorID());
}
- if (!received && timeout < 0)
+ if ( received == 0 && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
result = null;
@@ -489,8 +489,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().getQpidSession()
.messageFlow(getMessageActorID(),
org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- boolean received = getSession().getQpidSession().messageFlush(getMessageActorID());
- if (!received && _isNoWaitIsReceiving)
+ int received = getSession().getQpidSession().messageFlush(getMessageActorID());
+ if ( received == 0 && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
new file mode 100644
index 0000000000..112af50190
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.MessageListener;
+import org.apache.qpidity.api.Message;
+
+/**
+ * This listener idspatches messaes to its browser.
+ */
+public class QpidBrowserListener implements MessageListener
+{
+ /**
+ * Used for debugging.
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+ /**
+ * This message listener's browser.
+ */
+ QueueBrowserImpl _browser = null;
+
+ //---- constructor
+ /**
+ * Create a message listener wrapper for a given browser
+ *
+ * @param browser The browser of this listener
+ */
+ public QpidBrowserListener(QueueBrowserImpl browser)
+ {
+ _browser = browser;
+ }
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ //convert this message into a JMS one
+ javax.jms.Message jmsMessage = null; // todo
+ _browser.receiveMessage(jmsMessage);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index ad7a777c37..29821e1b81 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -17,10 +17,18 @@
*/
package org.apache.qpidity.jms;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.MessagePartListener;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.impl.MessagePartListenerAdapter;
+
import javax.jms.QueueBrowser;
import javax.jms.JMSException;
import javax.jms.Queue;
+import javax.jms.Message;
import java.util.Enumeration;
+import java.util.NoSuchElementException;
/**
* Implementation of the JMS QueueBrowser interface
@@ -32,6 +40,36 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
*/
private String _messageSelector = null;
+ /**
+ * The message selector filter associated with this browser
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * The batch of messages to browse.
+ */
+ private Message[] _messages;
+
+ /**
+ * The number of messages read from current batch.
+ */
+ private int _browsed = 0;
+
+ /**
+ * The number of messages received from current batch.
+ */
+ private int _received = 0;
+
+ /**
+ * Indicates whether the last message has been received.
+ */
+ private int _batchLength;
+
+ /**
+ * The batch max size
+ */
+ private final int _maxbatchlength = 10;
+
//--- constructor
/**
@@ -40,13 +78,26 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
* @param session The session of this browser.
* @param queue The queue name for this browser
* @param messageSelector only messages with properties matching the message selector expression are delivered.
- * @throws JMSException In case of internal problem when creating this browser.
+ * @throws Exception In case of internal problem when creating this browser.
*/
- protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException
+ protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
{
super(session, (DestinationImpl) queue);
- _messageSelector = messageSelector;
- //-- TODO: Create the QPid browser
+ // this is an array representing a batch of messages for this browser.
+ _messages = new Message[_maxbatchlength];
+ if (messageSelector != null)
+ {
+ _messageSelector = messageSelector;
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidBrowserListener(this));
+ // this is a queue we expect that this queue exists
+ getSession().getQpidSession()
+ .messageSubscribe(queue.getQueueName(), getMessageActorID(),
+ org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED,
+ // We do not acquire those messages
+ org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+
}
//--- javax.jms.QueueBrowser API
@@ -58,10 +109,11 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
*/
public Enumeration getEnumeration() throws JMSException
{
- // TODO
- return null;
+ requestMessages();
+ return new MessageEnumeration();
}
+
/**
* Get the queue associated with this queue browser.
*
@@ -70,6 +122,7 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
*/
public Queue getQueue() throws JMSException
{
+ checkNotClosed();
return (Queue) _destination;
}
@@ -81,6 +134,130 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
*/
public String getMessageSelector() throws JMSException
{
+ checkNotClosed();
return _messageSelector;
}
+
+ //-- overwritten methods.
+ /**
+ * Closes the browser and deregister it from its session.
+ *
+ * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ synchronized (_messages)
+ {
+ _received = 0;
+ _browsed = 0;
+ _batchLength = 0;
+ _messages.notify();
+ }
+ super.close();
+ }
+
+ //-- nonpublic methods
+ /**
+ * Request _maxbatchlength messages
+ *
+ * @throws JMSException If requesting more messages fails due to some internal error.
+ */
+ private void requestMessages() throws JMSException
+ {
+ _browsed = 0;
+ _received = 0;
+ // request messages
+ int received = 0;
+ try
+ {
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ _maxbatchlength);
+ _batchLength = getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the listener when a message is dispatched to this browser.
+ *
+ * @param m A received message
+ */
+ protected void receiveMessage(Message m)
+ {
+ synchronized (_messages)
+ {
+ _messages[_received] = m;
+ _received++;
+ _messages.notify();
+ }
+ }
+
+ //-- inner class
+ /**
+ * This is an implementation of the Enumeration interface.
+ */
+ private class MessageEnumeration implements Enumeration
+ {
+ /*
+ * Whether this enumeration has any more elements.
+ *
+ * @return True if there any more elements.
+ */
+ public boolean hasMoreElements()
+ {
+ boolean result = false;
+ // Try to work out whether there are any more messages available.
+ try
+ {
+ if (_browsed >= _maxbatchlength)
+ {
+ requestMessages();
+ }
+ synchronized (_messages)
+ {
+ while (_received == _browsed && _batchLength > _browsed)
+ {
+ // we expect more messages
+ _messages.wait();
+ }
+ if (_browsed < _received && _batchLength != _browsed)
+ {
+ result = true;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ // If no batch could be returned, the result should be false, therefore do nothing
+ }
+ return result;
+ }
+
+ /**
+ * Get the next message element
+ *
+ * @return The next element.
+ */
+ public Object nextElement()
+ {
+ if (hasMoreElements())
+ {
+ synchronized (_messages)
+ {
+ Message message = _messages[_browsed];
+ _browsed = _browsed + 1;
+ return message;
+ }
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 4723257794..0a75673961 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -129,8 +129,7 @@ public class SessionImpl implements Session
* @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
* {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true.
* @param isXA Indicates whether this session is an XA session.
- * @throws JMSSecurityException If the user could not be authenticated.
- * @throws QpidException In case of internal error.
+ * @throws QpidException In case of internal error.
*/
protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA)
throws QpidException
@@ -717,7 +716,15 @@ public class SessionImpl implements Session
{
checkNotClosed();
checkDestination(queue);
- QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector);
+ QueueBrowserImpl browser;
+ try
+ {
+ browser = new QueueBrowserImpl(this, queue, messageSelector);
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
// register this actor with the session
_messageActors.put(browser.getMessageActorID(), browser);
return browser;