diff options
Diffstat (limited to 'java')
9 files changed, 180 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index a0e659c359..9612417266 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -183,6 +183,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 list encoded format. private boolean _useLegacyStreamMessageFormat; + // When sending to a Queue destination for the first time, check that the queue is bound + private final boolean _validateQueueOnSend; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -310,6 +313,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) + { + _validateQueueOnSend = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); + } + else + { + _validateQueueOnSend = + Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1441,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate.getProtocolVersion(); } - + public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) @@ -1565,4 +1580,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate.setHeartbeatListener(listener); } + + public boolean validateQueueOnSend() + { + return _validateQueueOnSend; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 85c96bc3bb..8490a724bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic rk = routingKey.toString(); } - return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) @@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } } -}
\ No newline at end of file +} diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5cd596108a..20eaca44ae 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger ; @@ -291,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -455,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac JMSException ex = new JMSException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } amqDestination.setExchangeExistsChecked(true); @@ -546,7 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + private void checkPreConditions() throws JMSException { checkNotClosed(); @@ -560,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkInitialDestination() + private void checkInitialDestination() throws JMSException { if (_destination == null) { throw new UnsupportedOperationException("Destination is null"); } + checkValidQueue(); } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + private void checkDestination(Destination suppliedDestination) throws JMSException { if ((_destination != null) && (suppliedDestination != null)) { @@ -576,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } + if(suppliedDestination instanceof AMQQueue) + { + AMQQueue destination = (AMQQueue) suppliedDestination; + checkValidQueue(destination); + } if (suppliedDestination == null) { throw new InvalidDestinationException("Supplied Destination was invalid"); @@ -583,6 +590,42 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } + void checkValidQueue() throws JMSException + { + if(_destination instanceof AMQQueue) + { + checkValidQueue(_destination); + } + } + void checkValidQueue(AMQDestination destination) throws JMSException + { + if (!destination.isCheckedForQueueBinding() && validateQueueOnSend()) + { + if (getSession().isStrictAMQP()) + { + getLogger().warn("AMQP does not support destination validation before publish, "); + destination.setCheckedForQueueBinding(true); + } + else + { + if (isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + destination.getName() + + " is not a valid destination (no bindings on server"); + } + } + } + } + + private boolean validateQueueOnSend() + { + return _connection.validateQueueOnSend(); + } + /** * The session used to create this producer */ diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 237925f24b..c4fbeb5607 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.jms; -import org.apache.qpid.framing.AMQShortString; - import java.util.List; +import org.apache.qpid.framing.AMQShortString; /** Connection URL format @@ -35,7 +34,7 @@ public interface ConnectionURL public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; - public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format"; @@ -62,9 +61,11 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend"; + public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - + String getURL(); String getFailoverMethod(); diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index d4e0ee60a6..40ed9319f1 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -670,7 +670,6 @@ public class AMQSession_0_10Test extends QpidTestCase if (m instanceof ExchangeBound) { ExchangeBoundResult struc = new ExchangeBoundResult(); - struc.setQueueNotFound(true); result.setValue(struc); } else if (m instanceof ExchangeQuery) diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 7594d87b93..7aa280ce02 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -203,6 +203,7 @@ public class ClientProperties * producer/consumer creation when using BindingURLs. */ public static final String QPID_DECLARE_EXCHANGES_PROP_NAME = "qpid.declare_exchanges"; + public static final String VERIFY_QUEUE_ON_SEND = "qpid.verify_queue_on_send"; private ClientProperties() diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 371b40bfc8..22a98b6f42 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1148,7 +1148,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - Queue queue = ssn.createQueue("ADDR:amq.topic/test"); + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); Message msg = cons.receive(1000); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java index 53f37cd915..05b19c3391 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -21,16 +21,23 @@ package org.apache.qpid.test.unit.basic; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - +import java.util.Collections; +import java.util.Map; +import javax.jms.Connection; import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class InvalidDestinationTest extends QpidBrokerTestCase { @@ -52,17 +59,22 @@ public class InvalidDestinationTest extends QpidBrokerTestCase public void testInvalidDestination() throws Exception { - Queue invalidDestination = new AMQQueue("amq.direct","unknownQ"); - AMQQueue validDestination = new AMQQueue("amq.direct","knownQ"); + QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + Queue invalidDestination = queueSession.createQueue("unknownQ"); + + Queue validDestination = queueSession.createQueue(getTestQueueName()); + // This is the only easy way to create and bind a queue from the API :-( queueSession.createConsumer(validDestination); + QueueSender sender; + TextMessage msg= queueSession.createTextMessage("Hello"); - QueueSender sender = queueSession.createSender(invalidDestination); - TextMessage msg = queueSession.createTextMessage("Hello"); try { + sender = queueSession.createSender(invalidDestination); + sender.send(msg); fail("Expected InvalidDestinationException"); } @@ -70,10 +82,8 @@ public class InvalidDestinationTest extends QpidBrokerTestCase { // pass } - sender.close(); sender = queueSession.createSender(null); - invalidDestination = new AMQQueue("amq.direct","unknownQ"); try { @@ -86,7 +96,6 @@ public class InvalidDestinationTest extends QpidBrokerTestCase } sender.send(validDestination,msg); sender.close(); - validDestination = new AMQQueue("amq.direct","knownQ"); sender = queueSession.createSender(validDestination); sender.send(msg); @@ -96,6 +105,71 @@ public class InvalidDestinationTest extends QpidBrokerTestCase } + + public void testInvalidDestinationOnMessageProducer() throws Exception + { + setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true"); + final AMQConnection connection = (AMQConnection) getConnection(); + doInvalidDestinationOnMessageProducer(connection); + + } + + + public void testInvalidDestinationOnMessageProducerURL() throws Exception + { + Map<String, String> options = Collections.singletonMap(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND, "true"); + doInvalidDestinationOnMessageProducer(getConnectionWithOptions(options)); + + } + + private void doInvalidDestinationOnMessageProducer(Connection connection) throws JMSException + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue invalidDestination = session.createQueue("unknownQ"); + + Queue validDestination = session.createQueue("knownQ"); + + // This is the only easy way to create and bind a queue from the API :-( + session.createConsumer(validDestination); + + MessageProducer sender; + TextMessage msg = session.createTextMessage("Hello"); + try + { + sender = session.createProducer(invalidDestination); + sender.send(msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + + + sender = session.createProducer(null); + invalidDestination = new AMQQueue("amq.direct","unknownQ"); + + try + { + sender.send(invalidDestination,msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.send(validDestination, msg); + sender.close(); + sender = session.createProducer(validDestination); + sender.send(msg); + + Topic topic = session.createTopic("randomTopic"); + sender = session.createProducer(topic); + sender.send(msg); + } + + public static junit.framework.Test suite() { diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index bad1551c59..fe116f175b 100755 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -20,18 +20,15 @@ package org.apache.qpid.test.utils; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -49,13 +46,13 @@ import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; @@ -1131,6 +1128,22 @@ public class QpidBrokerTestCase extends QpidTestCase return getConnection(GUEST_USERNAME, GUEST_PASSWORD); } + public Connection getConnectionWithOptions(Map<String, String> options) + throws URLSyntaxException, NamingException, JMSException + { + ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString()); + for(Map.Entry<String,String> entry : options.entrySet()) + { + curl.setOption(entry.getKey(), entry.getValue()); + } + curl = new AMQConnectionURL(curl.toString()); + + curl.setUsername(GUEST_USERNAME); + curl.setPassword(GUEST_PASSWORD); + return getConnection(curl); + } + + public Connection getConnection(ConnectionURL url) throws JMSException { _logger.info(url.getURL()); |
