diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 14:54:01 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 14:54:01 +0000 |
commit | fc6d79eb365027d1fdda43ae0081f72dd45b7896 (patch) | |
tree | b8bd2f4f43faf9ce43438b7503e548e111f79512 /java/client | |
parent | 7ec92c0a43be9e5934b35565a7f46eb83e36f6d1 (diff) | |
download | qpid-python-fc6d79eb365027d1fdda43ae0081f72dd45b7896.tar.gz |
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie
AMQChannel.java - record messages browsed so not to discard them on ack.
FilterManagerFactory.java - Added a NoConsumerFilter
ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers
Subscription.java - Added autoClose and isBrowser methods
SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods
Added NoConsumerFilter.java
Patches from Rob Godfrey for client implmentation
AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method.
BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker.
AMQProtocolSession.java - method to allow cancellation of the client
AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java
Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 306 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java new file mode 100644 index 0000000000..5c753946a6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.log4j.Logger; + +import java.util.Enumeration; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.*; +import javax.jms.IllegalStateException; + +public class AMQQueueBrowser implements QueueBrowser +{ + private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class); + + + private AtomicBoolean _isClosed = new AtomicBoolean(); + private final AMQSession _session; + private final AMQQueue _queue; + private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>(); + private final String _messageSelector; + + + AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException + { + _session = session; + _queue = queue; + _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + consumer.close(); + } + + public Queue getQueue() throws JMSException + { + checkState(); + return _queue; + } + + private void checkState() throws JMSException + { + if (_isClosed.get()) + { + throw new IllegalStateException("Queue Browser"); + } + if (_session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + + } + + public String getMessageSelector() throws JMSException + { + + checkState(); + return _messageSelector; + } + + public Enumeration getEnumeration() throws JMSException + { + checkState(); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + _consumers.add(consumer); + + return new Enumeration() + { + + + Message _nextMessage = consumer.receive(); + + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + + return msg; + } + }; + } + + public void close() throws JMSException + { + for (BasicMessageConsumer consumer : _consumers) + { + consumer.close(); + } + _consumers.clear(); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bf61550cdc..2136d565f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -51,7 +51,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -146,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _inRecovery; + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, null, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, messageSelector, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi noLocal, false, messageSelector, - null); + null, + false, + false); + } + + public MessageConsumer createBrowserConsumer(Destination destination, + String messageSelector, + boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null, + true, + true); } public MessageConsumer createConsumer(Destination destination, @@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } @@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } protected MessageConsumer createConsumerImpl(final Destination destination, @@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final boolean noLocal, final boolean exclusive, final String selector, - final FieldTable rawSelector) throws JMSException + final FieldTable rawSelector, + final boolean noConsume, + final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -948,7 +973,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode); + _acknowledgeMode, noConsume, autoClose); try { @@ -1082,6 +1107,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } + if(consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + if(consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening @@ -1303,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return createBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1586,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + public void confirmConsumerCancelled(String consumerTag) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if((consumer != null) && (consumer.isAutoClose())) + { + consumer.closeWhenNoMessages(true); + } + } + + /* * I could have combined the last 3 methods, but this way it improves readability */ @@ -1616,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new javax.jms.InvalidDestinationException("Invalid Queue"); } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f0d3cf5abc..673321cd9d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private Thread _receivingThread; + /** + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. + */ + private boolean _autoClose; + private boolean _closeWhenNoMessages; + + private boolean _noConsume; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) + boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); + _autoClose = autoClose; + _noConsume = noConsume; } public AMQDestination getDestination() @@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = null; if (l > 0) { @@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private boolean closeOnAutoClose() throws JMSException + { + if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + { + close(false); + return true; + } + else + { + return false; + } + } + public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + public void close() throws JMSException { + close(true); + } + + public void close(boolean sendClose) throws JMSException + { synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); - - try + if(sendClose) { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); - } - catch (AMQException e) - { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + + try + { + _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + } + catch (AMQException e) + { + _logger.error("Error closing consumer: " + e, e); + throw new JMSException("Error closing consumer: " + e); + } } deregisterConsumer(); @@ -630,4 +671,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.clear(); } + + public boolean isAutoClose() + { + return _autoClose; + } + + + public boolean isNoConsume() + { + return _noConsume; + } + + public void closeWhenNoMessages(boolean b) + { + _closeWhenNoMessages = b; + + if(_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) + { + _receivingThread.interrupt(); + } + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java new file mode 100644 index 0000000000..d855e97204 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -0,0 +1,35 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class BasicCancelOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New BasicCancelOk method received"); + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); + evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index a4ed89719b..6a40fd3133 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } + + public void confirmConsumerCancelled(int channelId, String consumerTag) + { + final Integer chId = channelId; + final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + + session.confirmConsumerCancelled(consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 887850c06e..50bd1667f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); + frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); |