diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-02-04 14:16:37 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-02-04 14:16:37 +0000 |
| commit | 67d46dc8034c3f836cf49e8eaf818f983232dc38 (patch) | |
| tree | 82602371e8c04e6e35fb92a32f8745303ff6ff51 /qpid/java/client | |
| parent | d02bb3e83535d2f270a602184fb2e5838d2f7c63 (diff) | |
| download | qpid-python-67d46dc8034c3f836cf49e8eaf818f983232dc38.tar.gz | |
QPID-4312 : [Java Client] add option for verification of queue existence during creation of a MessageProducer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1442128 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
5 files changed, 76 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index a0e659c359..9612417266 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -183,6 +183,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 list encoded format. private boolean _useLegacyStreamMessageFormat; + // When sending to a Queue destination for the first time, check that the queue is bound + private final boolean _validateQueueOnSend; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -310,6 +313,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) + { + _validateQueueOnSend = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); + } + else + { + _validateQueueOnSend = + Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1441,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate.getProtocolVersion(); } - + public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) @@ -1565,4 +1580,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate.setHeartbeatListener(listener); } + + public boolean validateQueueOnSend() + { + return _validateQueueOnSend; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 85c96bc3bb..8490a724bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic rk = routingKey.toString(); } - return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) @@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5cd596108a..20eaca44ae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger ; @@ -291,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -455,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac JMSException ex = new JMSException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } amqDestination.setExchangeExistsChecked(true); @@ -546,7 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + private void checkPreConditions() throws JMSException { checkNotClosed(); @@ -560,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkInitialDestination() + private void checkInitialDestination() throws JMSException { if (_destination == null) { throw new UnsupportedOperationException("Destination is null"); } + checkValidQueue(); } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + private void checkDestination(Destination suppliedDestination) throws JMSException { if ((_destination != null) && (suppliedDestination != null)) { @@ -576,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } + if(suppliedDestination instanceof AMQQueue) + { + AMQQueue destination = (AMQQueue) suppliedDestination; + checkValidQueue(destination); + } if (suppliedDestination == null) { throw new InvalidDestinationException("Supplied Destination was invalid"); @@ -583,6 +590,42 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } + void checkValidQueue() throws JMSException + { + if(_destination instanceof AMQQueue) + { + checkValidQueue(_destination); + } + } + void checkValidQueue(AMQDestination destination) throws JMSException + { + if (!destination.isCheckedForQueueBinding() && validateQueueOnSend()) + { + if (getSession().isStrictAMQP()) + { + getLogger().warn("AMQP does not support destination validation before publish, "); + destination.setCheckedForQueueBinding(true); + } + else + { + if (isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + destination.getName() + + " is not a valid destination (no bindings on server"); + } + } + } + } + + private boolean validateQueueOnSend() + { + return _connection.validateQueueOnSend(); + } + /** * The session used to create this producer */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 237925f24b..c4fbeb5607 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.jms; -import org.apache.qpid.framing.AMQShortString; - import java.util.List; +import org.apache.qpid.framing.AMQShortString; /** Connection URL format @@ -35,7 +34,7 @@ public interface ConnectionURL public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; - public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format"; @@ -62,9 +61,11 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend"; + public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - + String getURL(); String getFailoverMethod(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index d4e0ee60a6..40ed9319f1 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -670,7 +670,6 @@ public class AMQSession_0_10Test extends QpidTestCase if (m instanceof ExchangeBound) { ExchangeBoundResult struc = new ExchangeBoundResult(); - struc.setQueueNotFound(true); result.setValue(struc); } else if (m instanceof ExchangeQuery) |
