summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-09 13:16:28 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-09 13:16:28 +0000
commitd958c2503a14112128beb24ed6de1bfc56df8582 (patch)
treecbc874a1857342722caa3b19decece9499f6476a /java/client/src
parent6d0479d0584a3b9f8f15e5d7291f438c2822511e (diff)
downloadqpid-python-d958c2503a14112128beb24ed6de1bfc56df8582.tar.gz
added property for setting max message size before a sync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593519 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java42
2 files changed, 41 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index a46c7f3cd5..9bf9fa5e64 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -86,6 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
p.load(new BufferedInputStream(new FileInputStream(file)));
environment.putAll(p);
+ System.getProperties().putAll(p);
_logger.info("Loaded Context Properties:" + environment.toString());
}
else
@@ -98,6 +99,8 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
}
+
+
createConnectionFactories(data, environment);
createDestinations(data, environment);
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index dd2481368f..ce8e98bc68 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -4,6 +4,8 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
+import java.nio.ByteBuffer;
import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
@@ -18,11 +20,31 @@ import org.apache.qpidity.nclient.MessagePartListener;
*/
public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
{
+ static
+ {
+ MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
+ String max = "message_size_before_sync";
+ if (System.getProperties().containsKey(max))
+ {
+ try
+ {
+ MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ }
+ }
+ }
+
+ private static long MAX_NOT_SYNC_DATA_LENGH ;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
+ private long _dataSentNotSync;
+
+
public void messageAcknowledge(RangeSet ranges)
{
for (Range range : ranges)
@@ -42,12 +64,24 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
{
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
+ ByteBuffer data = msg.readData();
+ _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
super.messageTransfer(destination, confirmMode, acquireMode);
super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- super.data(msg.readData());
- super.endData();
+ super.data( data );
+ super.endData();
+ if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
+ {
+ sync();
+ }
}
-
+
+ public void sync()
+ {
+ _dataSentNotSync = 0;
+ super.sync();
+ }
+
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);