summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
commitfc6d79eb365027d1fdda43ae0081f72dd45b7896 (patch)
treeb8bd2f4f43faf9ce43438b7503e548e111f79512 /java/broker
parent7ec92c0a43be9e5934b35565a7f46eb83e36f6d1 (diff)
downloadqpid-python-fc6d79eb365027d1fdda43ae0081f72dd45b7896.tar.gz
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie AMQChannel.java - record messages browsed so not to discard them on ack. FilterManagerFactory.java - Added a NoConsumerFilter ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers Subscription.java - Added autoClose and isBrowser methods SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods Added NoConsumerFilter.java Patches from Rob Godfrey for client implmentation AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method. BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker. AMQProtocolSession.java - method to allow cancellation of the client AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java153
6 files changed, 243 insertions, 49 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3081181c80..c5b45659cf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -46,6 +46,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -110,6 +112,7 @@ public class AMQChannel
private TxAck ackOp;
private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+ private Set<Long> _browsedAcks = new HashSet<Long>();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
@@ -555,7 +558,14 @@ public class AMQChannel
for (UnacknowledgedMessage msg : acked)
{
- msg.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
}
}
@@ -572,7 +582,16 @@ public class AMQChannel
_log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
- msg.discard();
+
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+
if (_log.isTraceEnabled())
{
_log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
@@ -693,6 +712,12 @@ public class AMQChannel
_returns.clear();
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
//we use this wrapper to ensure we are always using the correct
//map instance (its not final unfortunately)
private class AckMap implements UnacknowledgedMessageMap
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index 6ecd56586f..49f99132ef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -34,7 +35,6 @@ public class FilterManagerFactory
private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class);
//fixme move to a common class so it can be refered to from client code.
- private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
public static FilterManager createManager(FieldTable filters) throws AMQException
{
@@ -51,7 +51,7 @@ public class FilterManagerFactory
{
String key = (String) it.next();
_logger.info("filter:" + key);
- if (key.equals(JMS_SELECTOR_FILTER))
+ if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue()))
{
String selector = (String) filters.get(key);
@@ -61,6 +61,11 @@ public class FilterManagerFactory
}
}
+ if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue()))
+ {
+ manager.add(new NoConsumerFilter());
+ }
+
}
//If we added no filters don't bear the overhead of having an filter manager
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
new file mode 100644
index 0000000000..283d324ff6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.log4j.Logger;
+
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class NoConsumerFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class);
+
+
+ public NoConsumerFilter() throws AMQException
+ {
+ _logger.info("Created NoConsumerFilter");
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ return true;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f99f2d78b7..2100734ada 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -164,7 +164,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
subscription.enqueueForPreDelivery(message);
}
- }
+ }
}
public synchronized void removeAMessageFromTop() throws AMQException
@@ -187,11 +187,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && message.taken())
+ while (message != null && (sub.isBrowser() || message.taken()))
{
//remove the already taken message
messages.poll();
@@ -201,12 +201,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
{
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue);
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
@@ -215,7 +215,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
_log.info("Async Delivery Message:" + message + " to :" + sub);
- sub.send(message, queue);
+ sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
@@ -244,21 +244,33 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!sub.isSuspended())
{
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
- }
- else
- {
- sendNextMessage(sub, _messages, _queue);
- }
-
+ sendNextMessage(sub);
+
hasSubscribers = true;
}
}
}
}
+ private void sendNextMessage(Subscription sub)
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue());
+ if (sub.isAutoClose())
+ {
+ if (sub.getPreDeliveryQueue().isEmpty())
+ {
+ sub.close();
+ }
+ }
+ }
+ else
+ {
+ sendNextMessage(sub, _messages);
+ }
+ }
+
private AMQMessage poll()
{
return _messages.poll();
@@ -359,9 +371,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void processAsync(Executor executor)
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 523b5f06e9..a5672f2b19 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -40,4 +40,9 @@ public interface Subscription
void enqueueForPreDelivery(AMQMessage msg);
+ boolean isAutoClose();
+
+ void close();
+
+ boolean isBrowser();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index f4e7482396..4272541298 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -24,11 +24,13 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -64,6 +66,9 @@ public class SubscriptionImpl implements Subscription
*/
private final boolean _acks;
private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
@@ -105,9 +110,48 @@ public class SubscriptionImpl implements Subscription
_filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
if (_filters != null)
{
_messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
}
else
{
@@ -116,6 +160,7 @@ public class SubscriptionImpl implements Subscription
}
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -160,44 +205,78 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- try
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- queue.dequeue(msg);
- }
- synchronized(channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+ protocolSession.writeFrame(frame);
+ }
+ }
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ try
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- protocolSession.writeFrame(frame);
- }
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ queue.dequeue(msg);
}
- finally
+ synchronized(channel)
{
- msg.setDeliveredToConsumer();
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -290,6 +369,26 @@ public class SubscriptionImpl implements Subscription
}
}
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{