diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-06-06 03:39:03 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-06-06 03:39:03 +0000 |
| commit | 0fb17bbed6930f3ec110bf87d5cd4f3b91e2bbc2 (patch) | |
| tree | e49e773c94e2e65b82fc83a5708646a59771d0db /qpid/java/common | |
| parent | e2cb0dd0e4a9fe86d5b7c6a3270f4fa5da80f43e (diff) | |
| download | qpid-python-0fb17bbed6930f3ec110bf87d5cd4f3b91e2bbc2.tar.gz | |
QPID-1062: merge writes of separate frames within an assembly, use sync flag instead of sync command on message transfer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java | 28 | ||||
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java | 50 |
2 files changed, 61 insertions, 17 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index c11ef46d36..d1ea23035a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -65,6 +65,7 @@ public class Session extends Invoker private byte[] name; private long timeout = 60000; + private boolean autoSync = false; // channel may be null Channel channel; @@ -80,6 +81,7 @@ public class Session extends Invoker private int commandsOut = 0; private Map<Integer,Method> commands = new HashMap<Integer,Method>(); private int maxComplete = commandsOut - 1; + private boolean needSync = false; private AtomicBoolean closed = new AtomicBoolean(false); @@ -93,6 +95,14 @@ public class Session extends Invoker return name; } + public void setAutoSync(boolean value) + { + synchronized (commands) + { + this.autoSync = value; + } + } + public Map<Integer,Method> getOutstandingCommands() { return commands; @@ -242,7 +252,16 @@ public class Session extends Invoker { commands.put(next, m); } + if (autoSync) + { + m.setSync(true); + } + needSync = !m.isSync(); channel.method(m); + if (autoSync && !m.hasPayload()) + { + sync(); + } } } else @@ -286,6 +305,13 @@ public class Session extends Invoker public void endData() { channel.end(); + synchronized (commands) + { + if (autoSync) + { + sync(); + } + } } public void sync() @@ -300,7 +326,7 @@ public class Session extends Invoker { int point = commandsOut - 1; - if (lt(maxComplete, point)) + if (needSync && lt(maxComplete, point)) { ExecutionSync sync = new ExecutionSync(); sync.setSync(true); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index fa739cf637..b749332fa3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -22,6 +22,9 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpidity.transport.Constant; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; @@ -40,6 +43,8 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate private Sender<ByteBuffer> sender; private Object lock = new Object(); + private int bytes = 0; + private List<Frame> frames = new ArrayList<Frame>(); public OutputHandler(Sender<ByteBuffer> sender) { @@ -69,24 +74,37 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate public void frame(Frame frame) { - ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + frame.getSize()); - buf.put(frame.getFlags()); - buf.put((byte) frame.getType().getValue()); - buf.putShort((short) (frame.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(frame.getTrack()); - buf.putShort((short) frame.getChannel()); - // RESERVED - buf.putInt(0); - for(ByteBuffer frg : frame) - { - buf.put(frg); - } - buf.flip(); synchronized (lock) { - sender.send(buf); + frames.add(frame); + bytes += HEADER_SIZE + frame.getSize(); + + if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024) + { + ByteBuffer buf = ByteBuffer.allocate(bytes); + for (Frame f : frames) + { + buf.put(f.getFlags()); + buf.put((byte) f.getType().getValue()); + buf.putShort((short) (f.getSize() + HEADER_SIZE)); + // RESERVED + buf.put(RESERVED); + buf.put(f.getTrack()); + buf.putShort((short) f.getChannel()); + // RESERVED + buf.putInt(0); + for(ByteBuffer frg : f) + { + buf.put(frg); + } + } + buf.flip(); + + frames.clear(); + bytes = 0; + + sender.send(buf); + } } } |
