summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-22 13:24:13 +0000
committerAidan Skinner <aidan@apache.org>2008-05-22 13:24:13 +0000
commit363ceb155f788fe414b29d4e10e5533bac7e7c73 (patch)
treef28bcb885b948575b0f7e55b5b789518d334e0f7 /java/client/src/main
parentac17b670c3d5fe76fcb65d01fa2cf76fe514406c (diff)
downloadqpid-python-363ceb155f788fe414b29d4e10e5533bac7e7c73.tar.gz
QPID-1085: If an error occurs creating a durable subscriber with a selector delete the queue that was created.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659105 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java20
2 files changed, 33 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 404c0cd381..b747e31016 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -28,8 +28,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,7 +58,6 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -80,12 +79,12 @@ import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -927,12 +926,21 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
checkNotClosed();
checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ try
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- return subscriber;
+ return subscriber;
+ }
+ catch (JMSException e)
+ {
+ deleteQueue(dest.getAMQQueueName());
+ throw e;
+ }
+
}
public MapMessage createMapMessage() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cd1aa5aa8c..d071bcf0c2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -130,18 +130,26 @@ public class AMQSession_0_10 extends AMQSession
{
checkNotClosed();
checkValidTopic(topic);
- if( _subscriptions.containsKey(name))
+ if (_subscriptions.containsKey(name))
{
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ try
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ return subscriber;
+ }
+ catch (JMSException e)
+ {
+ deleteQueue(dest.getAMQQueueName());
+ throw e;
+ }
- return subscriber;
}
/**