From ba3c83a6b74d8145ece6930272b880260e613a3c Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Sun, 5 Sep 2010 18:50:31 +0000 Subject: QPID-2418: Unsubscribe existing open durable subscriptions when changing subscription. Remove duplication in implementations. Applied patch from Andrew Kennedy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992855 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 261 ++++++++++++++++----- .../org/apache/qpid/client/AMQSession_0_10.java | 69 ------ .../org/apache/qpid/client/AMQSession_0_8.java | 116 ++++----- .../main/java/org/apache/qpid/client/AMQTopic.java | 15 +- 4 files changed, 241 insertions(+), 220 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55b56444de..b96b32d990 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,8 +20,49 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -45,7 +86,6 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -56,25 +96,8 @@ import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; -import org.apache.qpid.url.AMQBindingURL; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** *

@@ -172,7 +195,7 @@ public abstract class AMQSession _thisSession = this; /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -261,15 +284,22 @@ public abstract class AMQSession _subscriptions = - new ConcurrentHashMap(); + protected final ConcurrentHashMap> _subscriptions = + new ConcurrentHashMap>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap _reverseSubscriptionMap = - new ConcurrentHashMap(); + protected final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap(); + + /** + * Locks to keep access to subscriber details atomic. + *

+ * Added for QPID2418 + */ + protected final Lock _subscriberDetails = new ReentrantLock(true); + protected final Lock _subscriberAccess = new ReentrantLock(true); /** * Used to hold incoming messages. @@ -311,9 +341,6 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); - //Map _consumers = - //new ConcurrentHashMap(); - /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes @@ -670,9 +697,9 @@ public abstract class AMQSession subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + + // Not subscribed to this name in the current session + if (subscriber == null) + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } else + { + topicName = new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + + "' for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + else + { + // Subscribed with the same topic and no current / previous or same selector + if (subscriber.getTopic().equals(topic) + && ((messageSelector == null && subscriber.getMessageSelector() == null) + || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector())))) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name + + (messageSelector != null ? " and selector " + messageSelector : "")); + } + else + { + unsubscribe(name, true); + } + } + + _subscriberAccess.lock(); + try + { + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + subscriber = new TopicSubscriberAdaptor(dest, consumer); - return subscriber; + // Save subscription information + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + } + finally + { + _subscriberAccess.unlock(); + } + + return subscriber; + } + finally + { + _subscriberDetails.unlock(); + } } public MapMessage createMapMessage() throws JMSException @@ -1732,23 +1835,51 @@ public abstract class AMQSession subscriber; + + _subscriberDetails.lock(); + try + { + checkNotClosed(); + subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // Remove saved subscription information + _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber.getMessageConsumer()); + } + } + finally + { + _subscriberDetails.unlock(); + } + if (subscriber != null) { subscriber.close(); + // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - _subscriptions.remove(name); - _reverseSubscriptionMap.remove(subscriber); } else { @@ -1763,7 +1894,7 @@ public abstract class AMQSession subscriber=_subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName=((AMQTopic) topic).getBindingKeys()[0]; - } - else - { - topicName=new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - protected Long requestQueueDepth(AMQDestination amqd) { return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 939e29e4d5..89fbd66e71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,31 +21,63 @@ package org.apache.qpid.client; -import javax.jms.*; -import javax.jms.IllegalStateException; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.ReturnMessage; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicAckBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public final class AMQSession_0_8 extends AMQSession { @@ -442,74 +474,6 @@ public final class AMQSession_0_8 extends AMQSession subscriber = _subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getRoutingKey(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - - - - public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException { new FailoverRetrySupport( diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 5f3f4e7d19..2704393cc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -46,11 +46,6 @@ public class AMQTopic extends AMQDestination implements Topic super(binding); } -// public AMQTopic(String exchangeName, String routingKey) -// { -// this(new AMQShortString(exchangeName), new AMQShortString(routingKey)); -// } - public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); @@ -79,8 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, - queueName, isDurable); + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, @@ -103,13 +97,6 @@ public class AMQTopic extends AMQDestination implements Topic true); } - public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection) - throws JMSException - { - return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false, - getDurableTopicQueueName(subscriptionName, connection), true); - } - public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { return new AMQShortString(connection.getClientID() + ":" + subscriptionName); -- cgit v1.2.1

CRC Card