diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-09 13:16:28 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-09 13:16:28 +0000 |
| commit | 2f0775218b8767e6d76ec2c9b8a823e64ab7a9a2 (patch) | |
| tree | 0093d68e66c62c2d3d74033a11092fd8519e545e /qpid/java | |
| parent | 8cf55327406939cb023e2d81cf53d951ac099152 (diff) | |
| download | qpid-python-2f0775218b8767e6d76ec2c9b8a823e64ab7a9a2.tar.gz | |
added property for setting max message size before a sync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@593519 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java | 3 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java | 42 |
2 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index a46c7f3cd5..9bf9fa5e64 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -86,6 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor p.load(new BufferedInputStream(new FileInputStream(file))); environment.putAll(p); + System.getProperties().putAll(p); _logger.info("Loaded Context Properties:" + environment.toString()); } else @@ -98,6 +99,8 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL)); } + + createConnectionFactories(data, environment); createDestinations(data, environment); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index dd2481368f..ce8e98bc68 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -4,6 +4,8 @@ import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Properties; +import java.nio.ByteBuffer; import org.apache.qpidity.transport.Option; import org.apache.qpidity.QpidException; @@ -18,11 +20,31 @@ import org.apache.qpidity.nclient.MessagePartListener; */ public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession { + static + { + MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024; + String max = "message_size_before_sync"; + if (System.getProperties().containsKey(max)) + { + try + { + MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max)); + } + catch (NumberFormatException e) + { + // use default size + } + } + } + + private static long MAX_NOT_SYNC_DATA_LENGH ; private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ClosedListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; - + private long _dataSentNotSync; + + public void messageAcknowledge(RangeSet ranges) { for (Range range : ranges) @@ -42,12 +64,24 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { // The javadoc clearly says that this method is suitable for small messages // therefore reading the content in one shot. + ByteBuffer data = msg.readData(); + _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit(); super.messageTransfer(destination, confirmMode, acquireMode); super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); - super.data(msg.readData()); - super.endData(); + super.data( data ); + super.endData(); + if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH) + { + sync(); + } } - + + public void sync() + { + _dataSentNotSync = 0; + super.sync(); + } + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException { super.messageTransfer(destination, confirmMode, acquireMode); |
