diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-29 14:24:38 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-29 14:24:38 +0000 |
| commit | 17873433da6881577925c3966308c672d1978d5a (patch) | |
| tree | a868caa69651dd8bb383e7d012d497f124c8e3d9 /java/client/src | |
| parent | a72c0139bfddbfeefeb720071f43f19c7d950ecd (diff) | |
| download | qpid-python-17873433da6881577925c3966308c672d1978d5a.tar.gz | |
added flush
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index 89da0bb691..4c5993455c 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -31,14 +31,26 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen // use default size MAX_NOT_SYNC_DATA_LENGH = 200000000; } + String flush = "message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH = 20000000; + } } private static long MAX_NOT_SYNC_DATA_LENGH; + private static long MAX_NOT_FLUSH_DATA_LENGH; private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ClosedListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; private long _currentDataSizeNotSynced; + private long _currentDataSizeNotFlushed; public void messageAcknowledge(RangeSet ranges) @@ -80,6 +92,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen public void data(ByteBuffer buf) { _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining(); + _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining(); super.data(buf); } @@ -123,6 +136,11 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { sync(); } + if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH) + { + executionFlush(); + _currentDataSizeNotFlushed = 0; + } } public RangeSet getAccquiredMessages() |
