From 3396562f8766d757a853573a5a6627f2cb0c8d23 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 4 Feb 2013 14:16:37 +0000 Subject: QPID-4312 : [Java Client] add option for verification of queue existence during creation of a MessageProducer git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1442128 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 22 ++++- .../org/apache/qpid/client/AMQSession_0_10.java | 4 +- .../apache/qpid/client/BasicMessageProducer.java | 53 ++++++++++-- .../java/org/apache/qpid/jms/ConnectionURL.java | 9 +- .../apache/qpid/client/AMQSession_0_10Test.java | 1 - .../qpid/configuration/ClientProperties.java | 1 + .../destination/AddressBasedDestinationTest.java | 2 +- .../test/unit/basic/InvalidDestinationTest.java | 96 +++++++++++++++++++--- .../apache/qpid/test/utils/QpidBrokerTestCase.java | 21 ++++- 9 files changed, 180 insertions(+), 29 deletions(-) (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index a0e659c359..9612417266 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -183,6 +183,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 list encoded format. private boolean _useLegacyStreamMessageFormat; + // When sending to a Queue destination for the first time, check that the queue is bound + private final boolean _validateQueueOnSend; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -310,6 +313,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) + { + _validateQueueOnSend = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); + } + else + { + _validateQueueOnSend = + Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1441,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate.getProtocolVersion(); } - + public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) @@ -1565,4 +1580,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate.setHeartbeatListener(listener); } + + public boolean validateQueueOnSend() + { + return _validateQueueOnSend; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 85c96bc3bb..8490a724bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession args) @@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQSession options = Collections.singletonMap(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND, "true"); + doInvalidDestinationOnMessageProducer(getConnectionWithOptions(options)); + + } + + private void doInvalidDestinationOnMessageProducer(Connection connection) throws JMSException + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue invalidDestination = session.createQueue("unknownQ"); + + Queue validDestination = session.createQueue("knownQ"); + + // This is the only easy way to create and bind a queue from the API :-( + session.createConsumer(validDestination); + + MessageProducer sender; + TextMessage msg = session.createTextMessage("Hello"); + try + { + sender = session.createProducer(invalidDestination); + sender.send(msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + + + sender = session.createProducer(null); + invalidDestination = new AMQQueue("amq.direct","unknownQ"); + + try + { + sender.send(invalidDestination,msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.send(validDestination, msg); + sender.close(); + sender = session.createProducer(validDestination); + sender.send(msg); + + Topic topic = session.createTopic("randomTopic"); + sender = session.createProducer(topic); + sender.send(msg); + } + + public static junit.framework.Test suite() { diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index bad1551c59..fe116f175b 100755 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -20,18 +20,15 @@ package org.apache.qpid.test.utils; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -49,13 +46,13 @@ import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; @@ -1131,6 +1128,22 @@ public class QpidBrokerTestCase extends QpidTestCase return getConnection(GUEST_USERNAME, GUEST_PASSWORD); } + public Connection getConnectionWithOptions(Map options) + throws URLSyntaxException, NamingException, JMSException + { + ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString()); + for(Map.Entry entry : options.entrySet()) + { + curl.setOption(entry.getKey(), entry.getValue()); + } + curl = new AMQConnectionURL(curl.toString()); + + curl.setUsername(GUEST_USERNAME); + curl.setPassword(GUEST_PASSWORD); + return getConnection(curl); + } + + public Connection getConnection(ConnectionURL url) throws JMSException { _logger.info(url.getURL()); -- cgit v1.2.1