diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-25 23:21:01 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-25 23:21:01 +0000 |
| commit | c620d4547f7dd7fb5f2a08e61ea113e58ce379e8 (patch) | |
| tree | 62bf1dc4edf6064939ead47f7c984e07d5a6cf14 /java/client | |
| parent | 58336d355bea1c1001356bbab67d4b758ef6b079 (diff) | |
| download | qpid-python-c620d4547f7dd7fb5f2a08e61ea113e58ce379e8.tar.gz | |
This is related to QPID-1106 and QPID-1677
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
5 files changed, 100 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c9fcc0f4c..5c48d73e43 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch - if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { - _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } - if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE)); + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the defaul value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the defaul value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } _failoverPolicy = new FailoverPolicy(connectionURL, this); @@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPersistence; } + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 954a3bc28f..ce9ee8a214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + enum PublishMode { ASYNC_PUBLISH_ALLL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; @@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALLL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, @@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); + setPublishMode(); + } + + void setPublishMode() + { + // Publish mode could be configured at destination level as well. + // Will add support for this when we provide a more robust binding URL + + String syncPub = _connection.getSyncPublish(); + // Support for deprecated option sync_persistence + if (syncPub.equals("persistent") || _connection.getSyncPersistence()) + { + publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + } + else if (syncPub.equals("all")) + { + publishMode = PublishMode.SYNC_PUBLISH_ALL; + } + + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } void resubscribe() throws AMQException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 4e5077f0cd..b8c5fc8faf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -151,9 +151,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer ((AMQSession_0_10) getSession()).getQpidSession(); // if true, we need to sync the delivery of this message - boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && - getSession().getAMQConnection().getSyncPersistence()); + boolean sync = false; + sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || + (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + deliveryMode == DeliveryMode.PERSISTENT) + ); + org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 986154cda8..3627618e68 100644 --- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -47,6 +47,19 @@ public class ClientProperties */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; + /** + * When true a sync command is sent after sending a message ack. + * type: boolean + */ + public static final String SYNC_ACK_PROP_NAME = "sync_ack"; + + /** + * sync_publish property - {persistent|all} + * If set to 'persistent',then persistent messages will be publish synchronously + * If set to 'all', then all messages regardless of the delivery mode will be + * published synchronously. + */ + public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** * This value will be used in the following settings diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index da8cd4f750..03ab967c36 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -33,9 +33,11 @@ import java.util.List; */ public interface ConnectionURL { - public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence"; - public static final String AMQ_MAXPREFETCH = "maxprefetch"; public static final String AMQ_PROTOCOL = "amqp"; + public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; + public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; |
