diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 19:20:13 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 19:20:13 +0000 |
| commit | 9c672d587e40572848e996c731db4b793a16e0ce (patch) | |
| tree | 29e413c37b499aeacfab0ca553865d2e9cb8fe21 /java | |
| parent | 088b7c8461979ee8df9480cb9105cc045fd632b6 (diff) | |
| download | qpid-python-9c672d587e40572848e996c731db4b793a16e0ce.tar.gz | |
Implemented queue browsing
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563226 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 267 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 07f274381f..3fe32d4e07 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -348,7 +348,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // This indicate to the delivery thread to deliver the message to this consumer // as it can happens that a message is delivered after a receive operation as returned. _isReceiving = true; - boolean received = false; + int received = 0; if (!_isStopped) { // if this consumer is stopped then this will be call when starting @@ -356,7 +356,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); received = getSession().getQpidSession().messageFlush(getMessageActorID()); } - if (!received && timeout < 0) + if ( received == 0 && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return result = null; @@ -489,8 +489,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().getQpidSession() .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - boolean received = getSession().getQpidSession().messageFlush(getMessageActorID()); - if (!received && _isNoWaitIsReceiving) + int received = getSession().getQpidSession().messageFlush(getMessageActorID()); + if ( received == 0 && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message // but no one can be delivered it then need to return diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java new file mode 100644 index 0000000000..112af50190 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java @@ -0,0 +1,70 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.MessageListener; +import org.apache.qpidity.api.Message; + +/** + * This listener idspatches messaes to its browser. + */ +public class QpidBrowserListener implements MessageListener +{ + /** + * Used for debugging. + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * This message listener's browser. + */ + QueueBrowserImpl _browser = null; + + //---- constructor + /** + * Create a message listener wrapper for a given browser + * + * @param browser The browser of this listener + */ + public QpidBrowserListener(QueueBrowserImpl browser) + { + _browser = browser; + } + + //---- org.apache.qpidity.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + try + { + //convert this message into a JMS one + javax.jms.Message jmsMessage = null; // todo + _browser.receiveMessage(jmsMessage); + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index ad7a777c37..29821e1b81 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -17,10 +17,18 @@ */ package org.apache.qpidity.jms; +import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpidity.filter.MessageFilter; +import org.apache.qpidity.MessagePartListener; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.impl.MessagePartListenerAdapter; + import javax.jms.QueueBrowser; import javax.jms.JMSException; import javax.jms.Queue; +import javax.jms.Message; import java.util.Enumeration; +import java.util.NoSuchElementException; /** * Implementation of the JMS QueueBrowser interface @@ -32,6 +40,36 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser */ private String _messageSelector = null; + /** + * The message selector filter associated with this browser + */ + private MessageFilter _filter = null; + + /** + * The batch of messages to browse. + */ + private Message[] _messages; + + /** + * The number of messages read from current batch. + */ + private int _browsed = 0; + + /** + * The number of messages received from current batch. + */ + private int _received = 0; + + /** + * Indicates whether the last message has been received. + */ + private int _batchLength; + + /** + * The batch max size + */ + private final int _maxbatchlength = 10; + //--- constructor /** @@ -40,13 +78,26 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser * @param session The session of this browser. * @param queue The queue name for this browser * @param messageSelector only messages with properties matching the message selector expression are delivered. - * @throws JMSException In case of internal problem when creating this browser. + * @throws Exception In case of internal problem when creating this browser. */ - protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException + protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception { super(session, (DestinationImpl) queue); - _messageSelector = messageSelector; - //-- TODO: Create the QPid browser + // this is an array representing a batch of messages for this browser. + _messages = new Message[_maxbatchlength]; + if (messageSelector != null) + { + _messageSelector = messageSelector; + _filter = new JMSSelectorFilter(messageSelector); + } + MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidBrowserListener(this)); + // this is a queue we expect that this queue exists + getSession().getQpidSession() + .messageSubscribe(queue.getQueueName(), getMessageActorID(), + org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + // We do not acquire those messages + org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + } //--- javax.jms.QueueBrowser API @@ -58,10 +109,11 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser */ public Enumeration getEnumeration() throws JMSException { - // TODO - return null; + requestMessages(); + return new MessageEnumeration(); } + /** * Get the queue associated with this queue browser. * @@ -70,6 +122,7 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser */ public Queue getQueue() throws JMSException { + checkNotClosed(); return (Queue) _destination; } @@ -81,6 +134,130 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser */ public String getMessageSelector() throws JMSException { + checkNotClosed(); return _messageSelector; } + + //-- overwritten methods. + /** + * Closes the browser and deregister it from its session. + * + * @throws JMSException if the MessaeActor cannot be closed due to some internal error. + */ + public void close() throws JMSException + { + synchronized (_messages) + { + _received = 0; + _browsed = 0; + _batchLength = 0; + _messages.notify(); + } + super.close(); + } + + //-- nonpublic methods + /** + * Request _maxbatchlength messages + * + * @throws JMSException If requesting more messages fails due to some internal error. + */ + private void requestMessages() throws JMSException + { + _browsed = 0; + _received = 0; + // request messages + int received = 0; + try + { + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, + _maxbatchlength); + _batchLength = getSession().getQpidSession().messageFlush(getMessageActorID()); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * This method is invoked by the listener when a message is dispatched to this browser. + * + * @param m A received message + */ + protected void receiveMessage(Message m) + { + synchronized (_messages) + { + _messages[_received] = m; + _received++; + _messages.notify(); + } + } + + //-- inner class + /** + * This is an implementation of the Enumeration interface. + */ + private class MessageEnumeration implements Enumeration + { + /* + * Whether this enumeration has any more elements. + * + * @return True if there any more elements. + */ + public boolean hasMoreElements() + { + boolean result = false; + // Try to work out whether there are any more messages available. + try + { + if (_browsed >= _maxbatchlength) + { + requestMessages(); + } + synchronized (_messages) + { + while (_received == _browsed && _batchLength > _browsed) + { + // we expect more messages + _messages.wait(); + } + if (_browsed < _received && _batchLength != _browsed) + { + result = true; + } + } + } + catch (Exception e) + { + // If no batch could be returned, the result should be false, therefore do nothing + } + return result; + } + + /** + * Get the next message element + * + * @return The next element. + */ + public Object nextElement() + { + if (hasMoreElements()) + { + synchronized (_messages) + { + Message message = _messages[_browsed]; + _browsed = _browsed + 1; + return message; + } + } + else + { + throw new NoSuchElementException(); + } + } + } + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 4723257794..0a75673961 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -129,8 +129,7 @@ public class SessionImpl implements Session * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to * {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true. * @param isXA Indicates whether this session is an XA session. - * @throws JMSSecurityException If the user could not be authenticated. - * @throws QpidException In case of internal error. + * @throws QpidException In case of internal error. */ protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA) throws QpidException @@ -717,7 +716,15 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(queue); - QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector); + QueueBrowserImpl browser; + try + { + browser = new QueueBrowserImpl(this, queue, messageSelector); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // register this actor with the session _messageActors.put(browser.getMessageActorID(), browser); return browser; |
