diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-09 23:22:52 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-09 23:22:52 +0000 |
commit | 70f32b028e2395d5a9097a1f897e7ded54b7fb88 (patch) | |
tree | 2d415e281da8c842bb0e2bec78dd3c1047a7db9a /java/cluster/src | |
parent | f214abfececda76c2bf10b7fde18d2eab72ad749 (diff) | |
download | qpid-python-70f32b028e2395d5a9097a1f897e7ded54b7fb88.tar.gz |
QPID-268 : (Patch supplied by Rob Godfrey) Improvements to performance of generated code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494650 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
5 files changed, 53 insertions, 28 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index d8d2220a8e..f9ec0eb878 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -108,10 +108,11 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); - ping.broker = new AMQShortString(_group.getLocal().getDetails()); - ping.responseRequired = true; - ping.load = _loadTable.getLocalLoad(); + ClusterPingBody ping = new ClusterPingBody((byte)8, + (byte)0, + _group.getLocal().getDetails(), + _loadTable.getLocalLoad(), + true); BlockingHandler handler = new BlockingHandler(); send(getLeader(), new SimpleBodySendable(ping), handler); handler.waitForCompletion(); @@ -156,8 +157,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, _logger.info(new LogMessage("Connected to {0}. joining", leader)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); - join.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterJoinBody join = new ClusterJoinBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(leader, new SimpleBodySendable(join)); } @@ -177,8 +180,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); - leave.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(getLeader(), new SimpleBodySendable(leave)); } @@ -200,8 +205,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); - suspect.broker = new AMQShortString(broker.getDetails()); + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, + (byte)0, + broker.getDetails()); + send(getLeader(), new SimpleBodySendable(suspect)); } } @@ -224,8 +231,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, //pass request on to leader: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); - request.broker = new AMQShortString(member.getDetails()); + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails()); + Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); @@ -271,9 +278,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); - //TODO: revise this way of converting String to bytes... - announce.members = membership.getBytes(); + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes()); + + return announce; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 722ec1b256..aa16595095 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -54,7 +54,16 @@ class ConsumerCounts { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); + BasicConsumeBody m = new BasicConsumeBody((byte)8, + (byte)0, + null, + queue, + false, + false, + false, + false, + queue, + 0); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index ce3e71f0a5..243a28e5e8 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false)) }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 6898ffcec2..5cf6d5c3ff 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -79,8 +79,14 @@ public class ClusteredQueue extends AMQQueue //send deletion request to all other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); - request.queue = getName(); + QueueDeleteBody request = new QueueDeleteBody((byte)8, + (byte)0, + false, + false, + false, + getName(), + 0); + _groupMgr.broadcast(new SimpleBodySendable(request)); } } @@ -93,8 +99,11 @@ public class ClusteredQueue extends AMQQueue //signal other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); - request.consumerTag = getName(); + BasicCancelBody request = new BasicCancelBody((byte)8, + (byte)0, + getName(), + false); + _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 89ce0bc8b1..568de62d1b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -60,7 +60,7 @@ public class PrivateQueue extends AMQQueue //send delete request to peers: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0); request.queue = getName(); _groupMgr.broadcast(new SimpleBodySendable(request)); } |