From 0fb17bbed6930f3ec110bf87d5cd4f3b91e2bbc2 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 6 Jun 2008 03:39:03 +0000 Subject: QPID-1062: merge writes of separate frames within an assembly, use sync flag instead of sync command on message transfer git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663813 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/transport/Session.java | 28 +++++++++++- .../qpidity/transport/network/OutputHandler.java | 50 +++++++++++++++------- 2 files changed, 61 insertions(+), 17 deletions(-) (limited to 'qpid/java/common') diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index c11ef46d36..d1ea23035a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -65,6 +65,7 @@ public class Session extends Invoker private byte[] name; private long timeout = 60000; + private boolean autoSync = false; // channel may be null Channel channel; @@ -80,6 +81,7 @@ public class Session extends Invoker private int commandsOut = 0; private Map commands = new HashMap(); private int maxComplete = commandsOut - 1; + private boolean needSync = false; private AtomicBoolean closed = new AtomicBoolean(false); @@ -93,6 +95,14 @@ public class Session extends Invoker return name; } + public void setAutoSync(boolean value) + { + synchronized (commands) + { + this.autoSync = value; + } + } + public Map getOutstandingCommands() { return commands; @@ -242,7 +252,16 @@ public class Session extends Invoker { commands.put(next, m); } + if (autoSync) + { + m.setSync(true); + } + needSync = !m.isSync(); channel.method(m); + if (autoSync && !m.hasPayload()) + { + sync(); + } } } else @@ -286,6 +305,13 @@ public class Session extends Invoker public void endData() { channel.end(); + synchronized (commands) + { + if (autoSync) + { + sync(); + } + } } public void sync() @@ -300,7 +326,7 @@ public class Session extends Invoker { int point = commandsOut - 1; - if (lt(maxComplete, point)) + if (needSync && lt(maxComplete, point)) { ExecutionSync sync = new ExecutionSync(); sync.setSync(true); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index fa739cf637..b749332fa3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -22,6 +22,9 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpidity.transport.Constant; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; @@ -40,6 +43,8 @@ public class OutputHandler implements Sender, NetworkDelegate private Sender sender; private Object lock = new Object(); + private int bytes = 0; + private List frames = new ArrayList(); public OutputHandler(Sender sender) { @@ -69,24 +74,37 @@ public class OutputHandler implements Sender, NetworkDelegate public void frame(Frame frame) { - ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + frame.getSize()); - buf.put(frame.getFlags()); - buf.put((byte) frame.getType().getValue()); - buf.putShort((short) (frame.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(frame.getTrack()); - buf.putShort((short) frame.getChannel()); - // RESERVED - buf.putInt(0); - for(ByteBuffer frg : frame) - { - buf.put(frg); - } - buf.flip(); synchronized (lock) { - sender.send(buf); + frames.add(frame); + bytes += HEADER_SIZE + frame.getSize(); + + if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024) + { + ByteBuffer buf = ByteBuffer.allocate(bytes); + for (Frame f : frames) + { + buf.put(f.getFlags()); + buf.put((byte) f.getType().getValue()); + buf.putShort((short) (f.getSize() + HEADER_SIZE)); + // RESERVED + buf.put(RESERVED); + buf.put(f.getTrack()); + buf.putShort((short) f.getChannel()); + // RESERVED + buf.putInt(0); + for(ByteBuffer frg : f) + { + buf.put(frg); + } + } + buf.flip(); + + frames.clear(); + bytes = 0; + + sender.send(buf); + } } } -- cgit v1.2.1