diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
| commit | 5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch) | |
| tree | 12616b7ae0cc57d7c3fb88025fd05cf31686d51a /java/cluster/src | |
| parent | 142d35580b326c99a306f6476ff0a0b723db920e (diff) | |
| download | qpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz | |
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
8 files changed, 54 insertions, 19 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 07d572d27f..5209df59cd 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -112,7 +112,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private void ping(Broker b) throws AMQException { - ClusterPingBody ping = new ClusterPingBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); ping.broker = _group.getLocal().getDetails(); ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); @@ -158,7 +160,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, Broker leader = connectToLeader(member); _logger.info(new LogMessage("Connected to {0}. joining", leader)); - ClusterJoinBody join = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); join.broker = _group.getLocal().getDetails(); send(leader, new SimpleSendable(join)); } @@ -177,7 +181,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, public void leave() throws AMQException { - ClusterLeaveBody leave = new ClusterLeaveBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); leave.broker = _group.getLocal().getDetails(); send(getLeader(), new SimpleSendable(leave)); } @@ -198,7 +204,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, } else { - ClusterSuspectBody suspect = new ClusterSuspectBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); suspect.broker = broker.getDetails(); send(getLeader(), new SimpleSendable(suspect)); } @@ -220,7 +228,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, else { //pass request on to leader: - ClusterJoinBody request = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); request.broker = member.getDetails(); Broker leader = getLeader(); send(leader, new SimpleSendable(request)); @@ -265,7 +275,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private ClusterMembershipBody createAnnouncement(String membership) { - ClusterMembershipBody announce = new ClusterMembershipBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); //TODO: revise this way of converting String to bytes... announce.members = membership.getBytes(); return announce; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 0836e9d5fa..93515e42b6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -48,7 +48,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); - session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + evt.getMethod().queue // consumerTag + )); } else { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 3bd9f5d387..832e4830ab 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -51,7 +51,9 @@ class ConsumerCounts { for(String queue : _counts.keySet()) { - BasicConsumeBody m = new BasicConsumeBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index 4a00b5cbc3..ce3e71f0a5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -44,15 +44,19 @@ import java.util.Arrays; public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + private final byte major = (byte)8; + private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody()), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody()) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index fa737cd1b6..338817e892 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -124,7 +124,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } _consumers.replay(methods); - methods.add(new ClusterSynchBody()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + methods.add(new ClusterSynchBody((byte)8, (byte)0)); return methods; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index ee16f6062f..8765aebf77 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -75,7 +75,9 @@ public class ClusteredQueue extends AMQQueue delete(); //send deletion request to all other members: - QueueDeleteBody request = new QueueDeleteBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } @@ -87,7 +89,9 @@ public class ClusteredQueue extends AMQQueue super.unregisterProtocolSession(ps, channel, consumerTag); //signal other members: - BasicCancelBody request = new BasicCancelBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); request.consumerTag = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index a3af0fedc7..94f17cb9d3 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -56,7 +56,9 @@ public class PrivateQueue extends AMQQueue super.autodelete(); //send delete request to peers: - QueueDeleteBody request = new QueueDeleteBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java index f7fe5dc35a..ed18710c64 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -148,6 +148,9 @@ public class BrokerTest extends TestCase TestMethod(Object id) { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + super((byte)8, (byte)0); this.id = id; } |
