summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
commitfc6d79eb365027d1fdda43ae0081f72dd45b7896 (patch)
treeb8bd2f4f43faf9ce43438b7503e548e111f79512 /java/client
parent7ec92c0a43be9e5934b35565a7f46eb83e36f6d1 (diff)
downloadqpid-python-fc6d79eb365027d1fdda43ae0081f72dd45b7896.tar.gz
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie AMQChannel.java - record messages browsed so not to discard them on ack. FilterManagerFactory.java - Added a NoConsumerFilter ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers Subscription.java - Added autoClose and isBrowser methods SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods Added NoConsumerFilter.java Patches from Rob Godfrey for client implmentation AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method. BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker. AMQProtocolSession.java - method to allow cancellation of the client AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java128
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java90
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java1
6 files changed, 306 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
new file mode 100644
index 0000000000..5c753946a6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+public class AMQQueueBrowser implements QueueBrowser
+{
+ private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class);
+
+
+ private AtomicBoolean _isClosed = new AtomicBoolean();
+ private final AMQSession _session;
+ private final AMQQueue _queue;
+ private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>();
+ private final String _messageSelector;
+
+
+ AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException
+ {
+ _session = session;
+ _queue = queue;
+ _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ consumer.close();
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ checkState();
+ return _queue;
+ }
+
+ private void checkState() throws JMSException
+ {
+ if (_isClosed.get())
+ {
+ throw new IllegalStateException("Queue Browser");
+ }
+ if (_session.isClosed())
+ {
+ throw new IllegalStateException("Session is closed");
+ }
+
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+
+ checkState();
+ return _messageSelector;
+ }
+
+ public Enumeration getEnumeration() throws JMSException
+ {
+ checkState();
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ _consumers.add(consumer);
+
+ return new Enumeration()
+ {
+
+
+ Message _nextMessage = consumer.receive();
+
+
+ public boolean hasMoreElements()
+ {
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ return (_nextMessage != null);
+ }
+
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
+ {
+ _logger.info("QB:nextElement about to receive");
+
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
+ }
+
+ return msg;
+ }
+ };
+ }
+
+ public void close() throws JMSException
+ {
+ for (BasicMessageConsumer consumer : _consumers)
+ {
+ consumer.close();
+ }
+ _consumers.clear();
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bf61550cdc..2136d565f1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -51,7 +51,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -146,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _inRecovery;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
null,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
noLocal,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
+ }
+
+ public MessageConsumer createBrowserConsumer(Destination destination,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null,
+ true,
+ true);
}
public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
@@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final boolean noLocal,
final boolean exclusive,
final String selector,
- final FieldTable rawSelector) throws JMSException
+ final FieldTable rawSelector,
+ final boolean noConsume,
+ final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -948,7 +973,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
+ _acknowledgeMode, noConsume, autoClose);
try
{
@@ -1082,6 +1107,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
+ if(consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+ if(consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
@@ -1303,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1586,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+ public void confirmConsumerCancelled(String consumerTag)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if((consumer != null) && (consumer.isAutoClose()))
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ }
+
+
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
@@ -1616,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f0d3cf5abc..673321cd9d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private Thread _receivingThread;
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private boolean _autoClose;
+ private boolean _closeWhenNoMessages;
+
+ private boolean _noConsume;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _autoClose = autoClose;
+ _noConsume = noConsume;
}
public AMQDestination getDestination()
@@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = null;
if (l > 0)
{
@@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ private boolean closeOnAutoClose() throws JMSException
+ {
+ if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ {
+ close(false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
public void close() throws JMSException
{
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
- try
+ if(sendClose)
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
- {
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
}
deregisterConsumer();
@@ -630,4 +671,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_unacknowledgedDeliveryTags.clear();
}
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+
+ public boolean isNoConsume()
+ {
+ return _noConsume;
+ }
+
+ public void closeWhenNoMessages(boolean b)
+ {
+ _closeWhenNoMessages = b;
+
+ if(_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
+ {
+ _receivingThread.interrupt();
+ }
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
new file mode 100644
index 0000000000..d855e97204
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -0,0 +1,35 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+ private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+
+ public static BasicCancelOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicCancelOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("New BasicCancelOk method received");
+ BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+ evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index a4ed89719b..6a40fd3133 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
+
+ public void confirmConsumerCancelled(int channelId, String consumerTag)
+ {
+ final Integer chId = channelId;
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+ session.confirmConsumerCancelled(consumerTag);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 887850c06e..50bd1667f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());