diff options
Diffstat (limited to 'java/cluster/src')
8 files changed, 29 insertions, 29 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 5209df59cd..b72e95b35e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -112,9 +112,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private void ping(Broker b) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); + ClusterPingBody ping = new ClusterPingBody((byte)0, (byte)9); ping.broker = _group.getLocal().getDetails(); ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); @@ -160,9 +160,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, Broker leader = connectToLeader(member); _logger.info(new LogMessage("Connected to {0}. joining", leader)); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); + ClusterJoinBody join = new ClusterJoinBody((byte)0, (byte)9); join.broker = _group.getLocal().getDetails(); send(leader, new SimpleSendable(join)); } @@ -181,9 +181,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, public void leave() throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); + ClusterLeaveBody leave = new ClusterLeaveBody((byte)0, (byte)9); leave.broker = _group.getLocal().getDetails(); send(getLeader(), new SimpleSendable(leave)); } @@ -204,9 +204,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, } else { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)0, (byte)9); suspect.broker = broker.getDetails(); send(getLeader(), new SimpleSendable(suspect)); } @@ -228,9 +228,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, else { //pass request on to leader: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); + ClusterJoinBody request = new ClusterJoinBody((byte)0, (byte)9); request.broker = member.getDetails(); Broker leader = getLeader(); send(leader, new SimpleSendable(request)); @@ -275,9 +275,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private ClusterMembershipBody createAnnouncement(String membership) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); + ClusterMembershipBody announce = new ClusterMembershipBody((byte)0, (byte)9); //TODO: revise this way of converting String to bytes... announce.members = membership.getBytes(); return announce; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 93515e42b6..4d3f1261b2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -48,11 +48,11 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) + (byte)0, (byte)9, // AMQP version (major, minor) evt.getMethod().queue // consumerTag )); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 832e4830ab..a601021bf1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -51,9 +51,9 @@ class ConsumerCounts { for(String queue : _counts.keySet()) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); + BasicConsumeBody m = new BasicConsumeBody((byte)0, (byte)9); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index ce3e71f0a5..a1058c6ff6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -44,10 +44,10 @@ import java.util.Arrays; public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - private final byte major = (byte)8; - private final byte minor = (byte)0; + private final byte major = (byte)0; + private final byte minor = (byte)9; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index 338817e892..cf931e8306 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -124,9 +124,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } _consumers.replay(methods); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - methods.add(new ClusterSynchBody((byte)8, (byte)0)); + methods.add(new ClusterSynchBody((byte)0, (byte)9)); return methods; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 8765aebf77..c5b4c1c4b5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -75,9 +75,9 @@ public class ClusteredQueue extends AMQQueue delete(); //send deletion request to all other members: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); + QueueDeleteBody request = new QueueDeleteBody((byte)0, (byte)9); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } @@ -89,9 +89,9 @@ public class ClusteredQueue extends AMQQueue super.unregisterProtocolSession(ps, channel, consumerTag); //signal other members: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); + BasicCancelBody request = new BasicCancelBody((byte)0, (byte)9); request.consumerTag = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 94f17cb9d3..2e291b0a3a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -56,9 +56,9 @@ public class PrivateQueue extends AMQQueue super.autodelete(); //send delete request to peers: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); + QueueDeleteBody request = new QueueDeleteBody((byte)0, (byte)9); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java index ed18710c64..9a73a44aa7 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -148,9 +148,9 @@ public class BrokerTest extends TestCase TestMethod(Object id) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - super((byte)8, (byte)0); + super((byte)0, (byte)9); this.id = id; } |
