summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-02-25 23:21:01 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-02-25 23:21:01 +0000
commitc620d4547f7dd7fb5f2a08e61ea113e58ce379e8 (patch)
tree62bf1dc4edf6064939ead47f7c984e07d5a6cf14 /java/client
parent58336d355bea1c1001356bbab67d4b758ef6b079 (diff)
downloadqpid-python-c620d4547f7dd7fb5f2a08e61ea113e58ce379e8.tar.gz
This is related to QPID-1106 and QPID-1677
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java58
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java6
5 files changed, 100 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6c9fcc0f4c..5c48d73e43 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
- _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
- if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPersistence;
}
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 954a3bc28f..ce9ee8a214 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+ enum PublishMode { ASYNC_PUBLISH_ALLL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
+
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
@@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+
+ protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALLL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
@@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
+ setPublishMode();
+ }
+
+ void setPublishMode()
+ {
+ // Publish mode could be configured at destination level as well.
+ // Will add support for this when we provide a more robust binding URL
+
+ String syncPub = _connection.getSyncPublish();
+ // Support for deprecated option sync_persistence
+ if (syncPub.equals("persistent") || _connection.getSyncPersistence())
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+ }
+ else if (syncPub.equals("all"))
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_ALL;
+ }
+
+ _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
void resubscribe() throws AMQException
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 4e5077f0cd..b8c5fc8faf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -151,9 +151,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
- boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
- getSession().getAMQConnection().getSyncPersistence());
+ boolean sync = false;
+ sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+ (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ deliveryMode == DeliveryMode.PERSISTENT)
+ );
+
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 986154cda8..3627618e68 100644
--- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -47,6 +47,19 @@ public class ClientProperties
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+ /**
+ * When true a sync command is sent after sending a message ack.
+ * type: boolean
+ */
+ public static final String SYNC_ACK_PROP_NAME = "sync_ack";
+
+ /**
+ * sync_publish property - {persistent|all}
+ * If set to 'persistent',then persistent messages will be publish synchronously
+ * If set to 'all', then all messages regardless of the delivery mode will be
+ * published synchronously.
+ */
+ public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
/**
* This value will be used in the following settings
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index da8cd4f750..03ab967c36 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -33,9 +33,11 @@ import java.util.List;
*/
public interface ConnectionURL
{
- public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence";
- public static final String AMQ_MAXPREFETCH = "maxprefetch";
public static final String AMQ_PROTOCOL = "amqp";
+ public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
+ public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
+ public static final String OPTIONS_SYNC_ACK = "sync_ack";
+ public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";