summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-05-05 09:59:01 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-05-05 09:59:01 +0000
commit02800af6c756a6f513783372faa682b6cf08e776 (patch)
treeab446f910c4627b6899497fae4783fd0cf255797 /java
parentd2c71a7e1e475a67adcdcdb88349c5fb89300faf (diff)
downloadqpid-python-02800af6c756a6f513783372faa682b6cf08e776.tar.gz
QPID-1018: added org.apache.qpid.client.configuration and add io props
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@653399 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/ClientProperties.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java14
4 files changed, 80 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ae54d9b9fe..aa402a436e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -23,19 +23,13 @@ package org.apache.qpid.client;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -355,7 +349,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
// use the defaul value set for all connections
- _maxPrefetch = ClientProperties.MAX_PREFETCH;
+ _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -365,7 +360,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
// use the defaul value set for all connections
- _syncPersistence = ClientProperties.SYNC_PERSISTENT;
+ _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
}
_failoverPolicy = new FailoverPolicy(connectionURL);
diff --git a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
deleted file mode 100644
index ad30a4b8c7..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.client;
-
-/**
- * This class centralized the Qpid client properties.
- */
-public class ClientProperties
-{
-
- /**
- * The maximum number of pre-fetched messages per destination
- */
- public static long MAX_PREFETCH = Long.valueOf(System.getProperties().getProperty("max_prefetch", "1000"));
-
- /**
- * When true a sync command is sent after every persistent messages.
- */
- public static boolean SYNC_PERSISTENT = Boolean.getBoolean("sync_persistence");
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
new file mode 100644
index 0000000000..c7fc21a121
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client.configuration;
+
+/**
+ * This class centralized the Qpid client properties.
+ */
+public class ClientProperties
+{
+
+ /**
+ * This property is currently used within the 0.10 code path only
+ * The maximum number of pre-fetched messages per destination
+ * This property is used for all the connection unless it is overwritten by the connectionURL
+ * type: long
+ */
+ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
+ public static final String MAX_PREFETCH_DEFAULT = "1000";
+
+ /**
+ * When true a sync command is sent after every persistent messages.
+ * type: boolean
+ */
+ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+
+ /**
+ * ==========================================================
+ * Those properties are used when the io size should be bounded
+ * ==========================================================
+ */
+
+ /**
+ * When set to true the io layer throttle down producers and consumers
+ * when written or read messages are accumulating and exceeding a certain size.
+ * This is especially useful when a the producer rate is greater than the network
+ * speed.
+ * type: boolean
+ */
+ public static final String PROTECTIO_PROP_NAME = "protectio";
+
+ //=== The following properties are only used when the previous one is true.
+ /**
+ * Max size of read messages that can be stored within the MINA layer
+ * type: int
+ */
+ public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ /**
+ * Max size of written messages that can be stored within the MINA layer
+ * type: int
+ */
+ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 97b3660240..2d8074eea2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -160,11 +161,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
- /** Default buffer size for pending messages reads */
- private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
-
- /** Default buffer size for pending messages writes */
- private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -222,7 +218,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.error(e.getMessage(), e);
}
- if (Boolean.getBoolean("protectio"))
+ if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
{
try
{
@@ -232,11 +228,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
+ ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
+ ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
writefilter.attach(chain);
session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");