diff options
4 files changed, 26 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 566582e666..8ab16c65c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -155,6 +155,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // this connection maximum number of prefetched messages private long _maxPrefetch; + //Indicates whether persistent messages are synchronized + private boolean _syncPersistence; + /** * @param broker brokerdetails * @param username username @@ -245,6 +248,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _maxPrefetch = ClientProperties.MAX_PREFETCH; } + if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + { + _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + } + else + { + // use the defaul value set for all connections + _syncPersistence = ClientProperties.SYNC_PERSISTENT; + } + _failoverPolicy = new FailoverPolicy(connectionURL); if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM)) { @@ -1203,4 +1216,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _maxPrefetch; } + + /** + * Indicates whether persistent messages are synchronized + * + * @return true if persistent messages are synchronized false otherwise + */ + public boolean getSyncPersistence() + { + return _syncPersistence; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 167810a591..2e31c43b4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -218,7 +218,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.get010Message(), org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); - if(deliveryMode == DeliveryMode.PERSISTENT && ClientProperties.FULLY_SYNC ) + if(deliveryMode == DeliveryMode.PERSISTENT && getSession().getAMQConnection().getSyncPersistence()) { // we need to sync the delivery of this message ((AMQSession_0_10) getSession()).getQpidSession().sync(); diff --git a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java index 73c59dcf96..ad30a4b8c7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java +++ b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java @@ -32,5 +32,5 @@ public class ClientProperties /** * When true a sync command is sent after every persistent messages. */ - public static boolean FULLY_SYNC = Boolean.getBoolean("fully_sync"); + public static boolean SYNC_PERSISTENT = Boolean.getBoolean("sync_persistence"); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index ea0fecb278..11376467d7 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -32,6 +32,7 @@ import java.util.List; */ public interface ConnectionURL { + public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence"; public static final String AMQ_MAXPREFETCH = "maxprefetch"; public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_BROKERLIST = "brokerlist"; |
