From 030bf18c007916798faf8e32a787bcc01919a6f0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 2 Nov 2012 16:39:57 +0000 Subject: QPID-4411 : QPID JMS QueueBrowser should defer getting messages until getEnumeration() is called. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1405042 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java | 133 ++++++++++++++------- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 31 +++++ 2 files changed, 119 insertions(+), 45 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java index 527e82eaed..914ec4bd0f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java @@ -18,9 +18,7 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; +import java.util.*; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; @@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.QueueBrowser; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.messaging.Filter; import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter; import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; @@ -39,49 +38,27 @@ public class QueueBrowserImpl implements QueueBrowser private static final String JMS_SELECTOR = "jms-selector"; private QueueImpl _queue; private String _selector; - private Receiver _receiver; - private Message _nextElement; - private MessageEnumeration _enumeration; + private final SessionImpl _session; + private Map _filters; + private HashSet _enumerations = new HashSet(); + private boolean _closed; QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException { _queue = queue; _selector = selector; + _session = session; - Map filters; if(selector == null || selector.trim().equals("")) { - filters = null; + _filters = null; } else { - filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector)); - } - - - try - { - _receiver = session.getClientSession().createReceiver(queue.getAddress(), - StdDistMode.COPY, - AcknowledgeMode.AMO,null, - false, - filters, null); - _nextElement = _receiver.receive(0L); - _enumeration = new MessageEnumeration(); - } - catch(AmqpErrorException e) - { - org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError(); - if(AmqpError.INVALID_FIELD.equals(error.getCondition())) - { - throw new InvalidSelectorException(e.getMessage()); - } - else - { - throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); - } - + _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector)); + // We do this just to have the server validate the filter.. + new MessageEnumeration().close(); } } @@ -97,42 +74,108 @@ public class QueueBrowserImpl implements QueueBrowser public Enumeration getEnumeration() throws JMSException { - if(_enumeration == null) + if(_closed) { throw new IllegalStateException("Browser has been closed"); } - return _enumeration; + return new MessageEnumeration(); } public void close() throws JMSException { - _receiver.close(); - _enumeration = null; + _closed = true; + for(MessageEnumeration me : new ArrayList(_enumerations)) + { + me.close(); + } } private final class MessageEnumeration implements Enumeration { + private Receiver _receiver; + private Message _nextElement; + private boolean _needNext = true; + + MessageEnumeration() throws JMSException + { + try + { + _receiver = _session.getClientSession().createReceiver(_queue.getAddress(), + StdDistMode.COPY, + AcknowledgeMode.AMO, null, + false, + _filters, null); + _receiver.setCredit(UnsignedInteger.valueOf(100), true); + } + catch(AmqpErrorException e) + { + org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError(); + if(AmqpError.INVALID_FIELD.equals(error.getCondition())) + { + throw new InvalidSelectorException(e.getMessage()); + } + else + { + throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); + } + + } + _enumerations.add(this); + + } + + public void close() + { + _enumerations.remove(this); + _receiver.close(); + _receiver = null; + } @Override public boolean hasMoreElements() { + if( _receiver == null ) + { + return false; + } + if( _needNext ) + { + _needNext = false; + _nextElement = _receiver.receive(0L); + if( _nextElement == null ) + { + // Drain to verify there really are no more messages. + _receiver.drain(); + _receiver.drainWait(); + _nextElement = _receiver.receive(0L); + if( _nextElement == null ) + { + close(); + } + else + { + // there are still more messages, open up the credit window again.. + _receiver.clearDrain(); + } + } + } return _nextElement != null; } @Override public Message nextElement() { - - Message message = _nextElement; - if(message == null) + if( hasMoreElements() ) { - message = _receiver.receive(0l); + Message message = _nextElement; + _nextElement = null; + _needNext = true; + return message; } - if(message != null) + else { - _nextElement = _receiver.receive(0l); + throw new NoSuchElementException(); } - return message; } } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 581744778e..8b792db1f1 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler _endpoint.drain(); } + /** + * Waits for the receiver to drain or a message to be available to be received. + * @return true if the receiver has been drained. + */ + public boolean drainWait() + { + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + try + { + while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + } + } + return _prefetchQueue.peek()==null && _endpoint.isDrained(); + } + + /** + * Clears the receiver drain so that message delivery can resume. + */ + public void clearDrain() + { + _endpoint.clearDrain(); + } + public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) { _endpoint.setLinkCredit(credit); -- cgit v1.2.1