summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-09 13:16:28 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-09 13:16:28 +0000
commit2f0775218b8767e6d76ec2c9b8a823e64ab7a9a2 (patch)
tree0093d68e66c62c2d3d74033a11092fd8519e545e /qpid/java
parent8cf55327406939cb023e2d81cf53d951ac099152 (diff)
downloadqpid-python-2f0775218b8767e6d76ec2c9b8a823e64ab7a9a2.tar.gz
added property for setting max message size before a sync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@593519 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java42
2 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index a46c7f3cd5..9bf9fa5e64 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -86,6 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
p.load(new BufferedInputStream(new FileInputStream(file)));
environment.putAll(p);
+ System.getProperties().putAll(p);
_logger.info("Loaded Context Properties:" + environment.toString());
}
else
@@ -98,6 +99,8 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
}
+
+
createConnectionFactories(data, environment);
createDestinations(data, environment);
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index dd2481368f..ce8e98bc68 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -4,6 +4,8 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
+import java.nio.ByteBuffer;
import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
@@ -18,11 +20,31 @@ import org.apache.qpidity.nclient.MessagePartListener;
*/
public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
{
+ static
+ {
+ MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
+ String max = "message_size_before_sync";
+ if (System.getProperties().containsKey(max))
+ {
+ try
+ {
+ MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ }
+ }
+ }
+
+ private static long MAX_NOT_SYNC_DATA_LENGH ;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
+ private long _dataSentNotSync;
+
+
public void messageAcknowledge(RangeSet ranges)
{
for (Range range : ranges)
@@ -42,12 +64,24 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
{
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
+ ByteBuffer data = msg.readData();
+ _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
super.messageTransfer(destination, confirmMode, acquireMode);
super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- super.data(msg.readData());
- super.endData();
+ super.data( data );
+ super.endData();
+ if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
+ {
+ sync();
+ }
}
-
+
+ public void sync()
+ {
+ _dataSentNotSync = 0;
+ super.sync();
+ }
+
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);