summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-29 14:24:38 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-29 14:24:38 +0000
commit688b117f9955077b6f4c296a42d37d3229f9a7d3 (patch)
tree5ee6e6cea321bee7eed7d5e2615a898807d765a0 /qpid/java/client
parente38196b7d9cfa003a18abb59cdaa5e838c927576 (diff)
downloadqpid-python-688b117f9955077b6f4c296a42d37d3229f9a7d3.tar.gz
added flush
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@599452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java18
1 files changed, 18 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index 89da0bb691..4c5993455c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -31,14 +31,26 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
// use default size
MAX_NOT_SYNC_DATA_LENGH = 200000000;
}
+ String flush = "message_size_before_flush";
+ try
+ {
+ MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000"));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ MAX_NOT_FLUSH_DATA_LENGH = 20000000;
+ }
}
private static long MAX_NOT_SYNC_DATA_LENGH;
+ private static long MAX_NOT_FLUSH_DATA_LENGH;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
private long _currentDataSizeNotSynced;
+ private long _currentDataSizeNotFlushed;
public void messageAcknowledge(RangeSet ranges)
@@ -80,6 +92,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
public void data(ByteBuffer buf)
{
_currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
+ _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
super.data(buf);
}
@@ -123,6 +136,11 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
{
sync();
}
+ if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
+ {
+ executionFlush();
+ _currentDataSizeNotFlushed = 0;
+ }
}
public RangeSet getAccquiredMessages()