summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-06-06 03:39:03 +0000
committerRafael H. Schloming <rhs@apache.org>2008-06-06 03:39:03 +0000
commit0fb17bbed6930f3ec110bf87d5cd4f3b91e2bbc2 (patch)
treee49e773c94e2e65b82fc83a5708646a59771d0db /qpid/java/common
parente2cb0dd0e4a9fe86d5b7c6a3270f4fa5da80f43e (diff)
downloadqpid-python-0fb17bbed6930f3ec110bf87d5cd4f3b91e2bbc2.tar.gz
QPID-1062: merge writes of separate frames within an assembly, use sync flag instead of sync command on message transfer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java28
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java50
2 files changed, 61 insertions, 17 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index c11ef46d36..d1ea23035a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -65,6 +65,7 @@ public class Session extends Invoker
private byte[] name;
private long timeout = 60000;
+ private boolean autoSync = false;
// channel may be null
Channel channel;
@@ -80,6 +81,7 @@ public class Session extends Invoker
private int commandsOut = 0;
private Map<Integer,Method> commands = new HashMap<Integer,Method>();
private int maxComplete = commandsOut - 1;
+ private boolean needSync = false;
private AtomicBoolean closed = new AtomicBoolean(false);
@@ -93,6 +95,14 @@ public class Session extends Invoker
return name;
}
+ public void setAutoSync(boolean value)
+ {
+ synchronized (commands)
+ {
+ this.autoSync = value;
+ }
+ }
+
public Map<Integer,Method> getOutstandingCommands()
{
return commands;
@@ -242,7 +252,16 @@ public class Session extends Invoker
{
commands.put(next, m);
}
+ if (autoSync)
+ {
+ m.setSync(true);
+ }
+ needSync = !m.isSync();
channel.method(m);
+ if (autoSync && !m.hasPayload())
+ {
+ sync();
+ }
}
}
else
@@ -286,6 +305,13 @@ public class Session extends Invoker
public void endData()
{
channel.end();
+ synchronized (commands)
+ {
+ if (autoSync)
+ {
+ sync();
+ }
+ }
}
public void sync()
@@ -300,7 +326,7 @@ public class Session extends Invoker
{
int point = commandsOut - 1;
- if (lt(maxComplete, point))
+ if (needSync && lt(maxComplete, point))
{
ExecutionSync sync = new ExecutionSync();
sync.setSync(true);
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
index fa739cf637..b749332fa3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -22,6 +22,9 @@ package org.apache.qpidity.transport.network;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpidity.transport.Constant;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolHeader;
@@ -40,6 +43,8 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
private Sender<ByteBuffer> sender;
private Object lock = new Object();
+ private int bytes = 0;
+ private List<Frame> frames = new ArrayList<Frame>();
public OutputHandler(Sender<ByteBuffer> sender)
{
@@ -69,24 +74,37 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
public void frame(Frame frame)
{
- ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + frame.getSize());
- buf.put(frame.getFlags());
- buf.put((byte) frame.getType().getValue());
- buf.putShort((short) (frame.getSize() + HEADER_SIZE));
- // RESERVED
- buf.put(RESERVED);
- buf.put(frame.getTrack());
- buf.putShort((short) frame.getChannel());
- // RESERVED
- buf.putInt(0);
- for(ByteBuffer frg : frame)
- {
- buf.put(frg);
- }
- buf.flip();
synchronized (lock)
{
- sender.send(buf);
+ frames.add(frame);
+ bytes += HEADER_SIZE + frame.getSize();
+
+ if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(bytes);
+ for (Frame f : frames)
+ {
+ buf.put(f.getFlags());
+ buf.put((byte) f.getType().getValue());
+ buf.putShort((short) (f.getSize() + HEADER_SIZE));
+ // RESERVED
+ buf.put(RESERVED);
+ buf.put(f.getTrack());
+ buf.putShort((short) f.getChannel());
+ // RESERVED
+ buf.putInt(0);
+ for(ByteBuffer frg : f)
+ {
+ buf.put(frg);
+ }
+ }
+ buf.flip();
+
+ frames.clear();
+ bytes = 0;
+
+ sender.send(buf);
+ }
}
}