diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
| commit | 75793016c7cc8fc156411041e4399243aadc563e (patch) | |
| tree | 418a201fa99bf12b32f5b77722ce92ed34e66e2f /java/client/src/main | |
| parent | 6ca88beeea89e37ec725e5e99bada7ae48d2870f (diff) | |
| download | qpid-python-75793016c7cc8fc156411041e4399243aadc563e.tar.gz | |
QPID-183 Patch supplied by Rob Godfrey. Major changes to durable topic subscription handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
8 files changed, 280 insertions, 51 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 5c13e7861f..c6f3f9c492 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, boolean isAutoDelete, String queueName) { + this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, + boolean isAutoDelete, String queueName, boolean isDurable) + { if (destinationName == null) { - throw new IllegalArgumentException("Destination name must not be null"); + throw new IllegalArgumentException("Destination exchange must not be null"); } if (exchangeName == null) { - throw new IllegalArgumentException("Exchange name must not be null"); + throw new IllegalArgumentException("Exchange exchange must not be null"); } if (exchangeClass == null) { @@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = isExclusive; _isAutoDelete = isAutoDelete; _queueName = queueName; + _isDurable = isDurable; } public String getEncodedName() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0cbf5bac60..d149683646 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,13 +23,15 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.server.handler.ExchangeBoundHandler; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; @@ -66,6 +68,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client + */ + private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + + /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait * feature. */ @@ -1087,19 +1098,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - * If a name is reused in creating a new subscriber with a different topic/selecto or no-local - * flag then the subcriber will receive messages matching the old subscription AND the new one. - * The spec states that the new one should replace the old one. - * TODO: fix it. - */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getQueueName()) && + !isQueueBound(dest.getQueueName(), topic.getTopicName())) + { + deleteSubscriptionQueue(dest.getQueueName()); + } + } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + _subscriptions.put(name,subscriber); + + return subscriber; + } + + private void deleteSubscriptionQueue(String queueName) throws JMSException + { + try + { + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false, + false, true); + _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } /** @@ -1110,9 +1155,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - return new TopicSubscriberAdaptor(dest, consumer); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name,subscriber); + return subscriber; } public TopicPublisher createPublisher(Topic topic) throws JMSException @@ -1150,20 +1197,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void unsubscribe(String name) throws JMSException { checkNotClosed(); - - String queue = _connection.getClientID() + ":" + name; - - AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null); - - try { - AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class); - // if this method doen't throw an exception means we have received a queue declare ok. - } catch (AMQException e) { - throw new javax.jms.InvalidDestinationException("This destination doesn't exist"); - } - //send a queue.delete for the subscription - AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); - _connection.getProtocolHandler().writeFrame(frame); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // send a queue.delete for the subscription + deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + _subscriptions.remove(name); + } + else + { + if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } + } + } + + private boolean isQueueBound(String queueName) throws JMSException + { + return isQueueBound(queueName, null); + } + + private boolean isQueueBound(String queueName, String routingKey) throws JMSException + { + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, + routingKey, queueName); + AMQMethodEvent response = null; + try + { + response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + return (responseBody.replyCode == ExchangeBoundHandler.OK); } private void checkTransacted() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 4dd38eea18..39304f3f4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(String name) { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null); - _isDurable = false; + this(name, true, null, false); } - /** - * Constructor for use in creating a topic to represent a durable subscription - * @param topic - * @param clientId - * @param subscriptionName - */ - public AMQTopic(AMQTopic topic, String clientId, String subscriptionName) + public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable) + { + super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, + queueName, isDurable); + } + + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) + throws JMSException + { + return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection), + true); + } + + public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName); - _isDurable = true; + return connection.getClientID() + ":" + subscriptionName; } public String getTopicName() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4fb62b49fc..2448e14542 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -303,6 +303,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { + _logger.warn("Interrupted: " + e, e); return null; } finally @@ -531,18 +532,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public void setConsumerTag(String consumerTag) - { + { _consumerTag = consumerTag; } public AMQSession getSession() { return _session; } - + private void checkPreConditions() throws JMSException{ - + this.checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java new file mode 100644 index 0000000000..34ec49436e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +/** + * @author Apache Software Foundation + */ +public class JMSAMQException extends JMSException +{ + public JMSAMQException(AMQException s) + { + super(s.getMessage(), String.valueOf(s.getErrorCode())); + setLinkedException(s); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java new file mode 100644 index 0000000000..858726745e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ExchangeBoundOkBody; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class); + private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); + + public static ExchangeBoundOkMethodHandler getInstance() + { + return _instance; + } + + private ExchangeBoundOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " + + body.replyText); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java new file mode 100644 index 0000000000..3271a715a2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class QueueDeleteOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class); + private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); + + public static QueueDeleteOkMethodHandler getInstance() + { + return _instance; + } + + private QueueDeleteOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index ab707bb51d..887850c06e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); + frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); } |
