summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-02-04 14:16:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-02-04 14:16:37 +0000
commit67d46dc8034c3f836cf49e8eaf818f983232dc38 (patch)
tree82602371e8c04e6e35fb92a32f8745303ff6ff51 /qpid/java/client
parentd02bb3e83535d2f270a602184fb2e5838d2f7c63 (diff)
downloadqpid-python-67d46dc8034c3f836cf49e8eaf818f983232dc38.tar.gz
QPID-4312 : [Java Client] add option for verification of queue existence during creation of a MessageProducer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1442128 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java53
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java9
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java1
5 files changed, 76 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index a0e659c359..9612417266 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -183,6 +183,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 list encoded format.
private boolean _useLegacyStreamMessageFormat;
+ // When sending to a Queue destination for the first time, check that the queue is bound
+ private final boolean _validateQueueOnSend;
+
//used to track the last failover time for
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
@@ -310,6 +313,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
}
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
+ {
+ _validateQueueOnSend = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
+ }
+ else
+ {
+ _validateQueueOnSend =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
+ }
+
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
{
@@ -1441,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _delegate.getProtocolVersion();
}
-
+
public String getBrokerUUID()
{
if(getProtocolVersion().equals(ProtocolVersion.v0_10))
@@ -1565,4 +1580,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_delegate.setHeartbeatListener(listener);
}
+
+ public boolean validateQueueOnSend()
+ {
+ return _validateQueueOnSend;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 85c96bc3bb..8490a724bf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
rk = routingKey.toString();
}
- return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+ return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
@@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5cd596108a..20eaca44ae 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+
+
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
private final Logger _logger ;
@@ -291,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
checkPreConditions();
checkInitialDestination();
-
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -455,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
JMSException ex = new JMSException("Error validating destination");
ex.initCause(e);
ex.setLinkedException(e);
-
+
throw ex;
}
amqDestination.setExchangeExistsChecked(true);
@@ -546,7 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ private void checkPreConditions() throws JMSException
{
checkNotClosed();
@@ -560,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- private void checkInitialDestination()
+ private void checkInitialDestination() throws JMSException
{
if (_destination == null)
{
throw new UnsupportedOperationException("Destination is null");
}
+ checkValidQueue();
}
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ private void checkDestination(Destination suppliedDestination) throws JMSException
{
if ((_destination != null) && (suppliedDestination != null))
{
@@ -576,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
"This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
+ if(suppliedDestination instanceof AMQQueue)
+ {
+ AMQQueue destination = (AMQQueue) suppliedDestination;
+ checkValidQueue(destination);
+ }
if (suppliedDestination == null)
{
throw new InvalidDestinationException("Supplied Destination was invalid");
@@ -583,6 +590,42 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
+ void checkValidQueue() throws JMSException
+ {
+ if(_destination instanceof AMQQueue)
+ {
+ checkValidQueue(_destination);
+ }
+ }
+ void checkValidQueue(AMQDestination destination) throws JMSException
+ {
+ if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
+ {
+ if (getSession().isStrictAMQP())
+ {
+ getLogger().warn("AMQP does not support destination validation before publish, ");
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ if (isBound(destination))
+ {
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + destination.getName()
+ + " is not a valid destination (no bindings on server");
+ }
+ }
+ }
+ }
+
+ private boolean validateQueueOnSend()
+ {
+ return _connection.validateQueueOnSend();
+ }
+
/**
* The session used to create this producer
*/
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 237925f24b..c4fbeb5607 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.jms;
-import org.apache.qpid.framing.AMQShortString;
-
import java.util.List;
+import org.apache.qpid.framing.AMQShortString;
/**
Connection URL format
@@ -35,7 +34,7 @@ public interface ConnectionURL
public static final String AMQ_PROTOCOL = "amqp";
public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
- public static final String OPTIONS_SYNC_ACK = "sync_ack";
+ public static final String OPTIONS_SYNC_ACK = "sync_ack";
public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format";
@@ -62,9 +61,11 @@ public interface ConnectionURL
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+ public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend";
+
public static final byte URL_0_8 = 1;
public static final byte URL_0_10 = 2;
-
+
String getURL();
String getFailoverMethod();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index d4e0ee60a6..40ed9319f1 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -670,7 +670,6 @@ public class AMQSession_0_10Test extends QpidTestCase
if (m instanceof ExchangeBound)
{
ExchangeBoundResult struc = new ExchangeBoundResult();
- struc.setQueueNotFound(true);
result.setValue(struc);
}
else if (m instanceof ExchangeQuery)