diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-05-22 13:24:13 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-05-22 13:24:13 +0000 |
| commit | 363ceb155f788fe414b29d4e10e5533bac7e7c73 (patch) | |
| tree | f28bcb885b948575b0f7e55b5b789518d334e0f7 /java/client/src | |
| parent | ac17b670c3d5fe76fcb65d01fa2cf76fe514406c (diff) | |
| download | qpid-python-363ceb155f788fe414b29d4e10e5533bac7e7c73.tar.gz | |
QPID-1085: If an error occurs creating a durable subscriber with a selector delete the queue that was created.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659105 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
3 files changed, 106 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 404c0cd381..b747e31016 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,8 +28,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -58,7 +58,6 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -80,12 +79,12 @@ import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -927,12 +926,21 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess checkNotClosed(); checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + try + { + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; + return subscriber; + } + catch (JMSException e) + { + deleteQueue(dest.getAMQQueueName()); + throw e; + } + } public MapMessage createMapMessage() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cd1aa5aa8c..d071bcf0c2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -130,18 +130,26 @@ public class AMQSession_0_10 extends AMQSession { checkNotClosed(); checkValidTopic(topic); - if( _subscriptions.containsKey(name)) + if (_subscriptions.containsKey(name)) { _subscriptions.get(name).close(); } AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + try + { + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + return subscriber; + } + catch (JMSException e) + { + deleteQueue(dest.getAMQQueueName()); + throw e; + } - return subscriber; } /** diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 50ed1dee9e..6856ad34fb 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -33,6 +33,9 @@ import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -273,6 +276,76 @@ public class DurableSubscriptionTest extends QpidTestCase con3.close(); } + /*** + * This tests the fix for QPID-1085 + * Creates a durable subscriber with an invalid selector, checks that the + * exception is thrown correctly and that the subscription is not created. + * @throws Exception + */ + public void testDurableWithInvalidSelector() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "MyTestDurableWithInvalidSelectorTopic"); + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); + try + { + TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub", + "=TEST 'test", true); + assertNull("Subscriber should not have been created", deadSubscriber); + } + catch (InvalidSelectorException e) + { + // This was expected + } + + TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub"); + assertNotNull("Subscriber should have been created", liveSubscriber); + + producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); + + Message msg = liveSubscriber.receive(); + assertNotNull ("Message should have been received", msg); + assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); + assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); + } + + /*** + * This tests the fix for QPID-1085 + * Creates a durable subscriber with an invalid destination, checks that the + * exception is thrown correctly and that the subscription is not created. + * @throws Exception + */ + public void testDurableWithInvalidDestination() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "testDurableWithInvalidDestinationTopic"); + try + { + TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub"); + assertNull("Subscriber should not have been created", deadSubscriber); + } + catch (InvalidDestinationException e) + { + // This was expected + } + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); + + TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub"); + assertNotNull("Subscriber should have been created", liveSubscriber); + + producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); + Message msg = liveSubscriber.receive(); + assertNotNull ("Message should have been received", msg); + assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); + assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DurableSubscriptionTest.class); |
