summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-15 14:12:24 +0000
committerRobert Greig <rgreig@apache.org>2006-12-15 14:12:24 +0000
commit75793016c7cc8fc156411041e4399243aadc563e (patch)
tree418a201fa99bf12b32f5b77722ce92ed34e66e2f /java/client/src/main
parent6ca88beeea89e37ec725e5e99bada7ae48d2870f (diff)
downloadqpid-python-75793016c7cc8fc156411041e4399243aadc563e.tar.gz
QPID-183 Patch supplied by Rob Godfrey. Major changes to durable topic subscription handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java125
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java6
8 files changed, 280 insertions, 51 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 5c13e7861f..c6f3f9c492 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable
protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
boolean isAutoDelete, String queueName)
{
+ this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
+ boolean isAutoDelete, String queueName, boolean isDurable)
+ {
if (destinationName == null)
{
- throw new IllegalArgumentException("Destination name must not be null");
+ throw new IllegalArgumentException("Destination exchange must not be null");
}
if (exchangeName == null)
{
- throw new IllegalArgumentException("Exchange name must not be null");
+ throw new IllegalArgumentException("Exchange exchange must not be null");
}
if (exchangeClass == null)
{
@@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = isExclusive;
_isAutoDelete = isAutoDelete;
_queueName = queueName;
+ _isDurable = isDurable;
}
public String getEncodedName()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0cbf5bac60..d149683646 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,13 +23,15 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
@@ -66,6 +68,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
+ */
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+
+ /**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
* feature.
*/
@@ -1087,19 +1098,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- * If a name is reused in creating a new subscriber with a different topic/selecto or no-local
- * flag then the subcriber will receive messages matching the old subscription AND the new one.
- * The spec states that the new one should replace the old one.
- * TODO: fix it.
- */
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " +
+ name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getQueueName()) &&
+ !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ {
+ deleteSubscriptionQueue(dest.getQueueName());
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ _subscriptions.put(name,subscriber);
+
+ return subscriber;
+ }
+
+ private void deleteSubscriptionQueue(String queueName) throws JMSException
+ {
+ try
+ {
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
+ false, true);
+ _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
/**
@@ -1110,9 +1155,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- return new TopicSubscriberAdaptor(dest, consumer);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name,subscriber);
+ return subscriber;
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
@@ -1150,20 +1197,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void unsubscribe(String name) throws JMSException
{
checkNotClosed();
-
- String queue = _connection.getClientID() + ":" + name;
-
- AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null);
-
- try {
- AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class);
- // if this method doen't throw an exception means we have received a queue declare ok.
- } catch (AMQException e) {
- throw new javax.jms.InvalidDestinationException("This destination doesn't exist");
- }
- //send a queue.delete for the subscription
- AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
- _connection.getProtocolHandler().writeFrame(frame);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ // send a queue.delete for the subscription
+ deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ _subscriptions.remove(name);
+ }
+ else
+ {
+ if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ {
+ deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ }
+ else
+ {
+ throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ }
+ }
+ }
+
+ private boolean isQueueBound(String queueName) throws JMSException
+ {
+ return isQueueBound(queueName, null);
+ }
+
+ private boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ {
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
+ routingKey, queueName);
+ AMQMethodEvent response = null;
+ try
+ {
+ response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+ return (responseBody.replyCode == ExchangeBoundHandler.OK);
}
private void checkTransacted() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 4dd38eea18..39304f3f4c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(String name)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
- _isDurable = false;
+ this(name, true, null, false);
}
- /**
- * Constructor for use in creating a topic to represent a durable subscription
- * @param topic
- * @param clientId
- * @param subscriptionName
- */
- public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+ public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ {
+ super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ queueName, isDurable);
+ }
+
+ public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ true);
+ }
+
+ public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
- _isDurable = true;
+ return connection.getClientID() + ":" + subscriptionName;
}
public String getTopicName() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4fb62b49fc..2448e14542 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -303,6 +303,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
+ _logger.warn("Interrupted: " + e, e);
return null;
}
finally
@@ -531,18 +532,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
public AMQSession getSession() {
return _session;
}
-
+
private void checkPreConditions() throws JMSException{
-
+
this.checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
new file mode 100644
index 0000000000..34ec49436e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSAMQException extends JMSException
+{
+ public JMSAMQException(AMQException s)
+ {
+ super(s.getMessage(), String.valueOf(s.getErrorCode()));
+ setLinkedException(s);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
new file mode 100644
index 0000000000..858726745e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class);
+ private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
+
+ public static ExchangeBoundOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
+ body.replyText);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
new file mode 100644
index 0000000000..3271a715a2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
+ private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+
+ public static QueueDeleteOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueDeleteOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index ab707bb51d..887850c06e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+ frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
}