From a57a8590b2aad71438fde0b977cd928c70dcdf2f Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 11 Feb 2009 19:16:48 +0000 Subject: QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@743455 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/transport/Method.java | 13 +++++++++++++ .../src/main/java/org/apache/qpid/transport/Session.java | 15 ++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java index 6b99f6d5d3..09cfd119be 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -112,6 +112,19 @@ public abstract class Method extends Struct implements ProtocolEvent throw new UnsupportedOperationException(); } + public int getBodySize() + { + ByteBuffer body = getBody(); + if (body == null) + { + return 0; + } + else + { + return body.remaining(); + } + } + public abstract byte getEncodedTrack(); public abstract void dispatch(C context, MethodDelegate delegate); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 951370a124..f94edcc655 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -95,7 +95,9 @@ public class Session extends SessionInvoker // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[64*1024]; + private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private int commandBytes = 0; + private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; private boolean needSync = false; @@ -432,7 +434,13 @@ public class Session extends SessionInvoker int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands[mod(id, commands.length)] = null; + int idx = mod(id, commands.length); + Method m = commands[idx]; + if (m != null) + { + commandBytes -= m.getBodySize(); + } + commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -462,7 +470,7 @@ public class Session extends SessionInvoker final private boolean isFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commands.length || commandBytes >= byteLimit; } public void invoke(Method m) @@ -542,6 +550,7 @@ public class Session extends SessionInvoker if (expiry > 0) { commands[mod(next, commands.length)] = m; + commandBytes += m.getBodySize(); } if (autoSync) { -- cgit v1.2.1