summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
committerKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
commit5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch)
tree12616b7ae0cc57d7c3fb88025fd05cf31686d51a /java/cluster
parent142d35580b326c99a306f6476ff0a0b723db920e (diff)
downloadqpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java24
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java18
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java4
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java3
8 files changed, 54 insertions, 19 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 07d572d27f..5209df59cd 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -112,7 +112,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private void ping(Broker b) throws AMQException
{
- ClusterPingBody ping = new ClusterPingBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
ping.broker = _group.getLocal().getDetails();
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
@@ -158,7 +160,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
Broker leader = connectToLeader(member);
_logger.info(new LogMessage("Connected to {0}. joining", leader));
- ClusterJoinBody join = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
join.broker = _group.getLocal().getDetails();
send(leader, new SimpleSendable(join));
}
@@ -177,7 +181,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
public void leave() throws AMQException
{
- ClusterLeaveBody leave = new ClusterLeaveBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
leave.broker = _group.getLocal().getDetails();
send(getLeader(), new SimpleSendable(leave));
}
@@ -198,7 +204,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
}
else
{
- ClusterSuspectBody suspect = new ClusterSuspectBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
suspect.broker = broker.getDetails();
send(getLeader(), new SimpleSendable(suspect));
}
@@ -220,7 +228,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
else
{
//pass request on to leader:
- ClusterJoinBody request = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
request.broker = member.getDetails();
Broker leader = getLeader();
send(leader, new SimpleSendable(request));
@@ -265,7 +275,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private ClusterMembershipBody createAnnouncement(String membership)
{
- ClusterMembershipBody announce = new ClusterMembershipBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
//TODO: revise this way of converting String to bytes...
announce.members = membership.getBytes();
return announce;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index 0836e9d5fa..93515e42b6 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -48,7 +48,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ evt.getMethod().queue // consumerTag
+ ));
}
else
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 3bd9f5d387..832e4830ab 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -51,7 +51,9 @@ class ConsumerCounts
{
for(String queue : _counts.keySet())
{
- BasicConsumeBody m = new BasicConsumeBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0);
m.queue = queue;
m.consumerTag = queue;
replay(m, messages);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index 4a00b5cbc3..ce3e71f0a5 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -44,15 +44,19 @@ import java.util.Arrays;
public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ private final byte major = (byte)8;
+ private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody()),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody())
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor))
});
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index fa737cd1b6..338817e892 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -124,7 +124,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
}
}
_consumers.replay(methods);
- methods.add(new ClusterSynchBody());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ methods.add(new ClusterSynchBody((byte)8, (byte)0));
return methods;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index ee16f6062f..8765aebf77 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -75,7 +75,9 @@ public class ClusteredQueue extends AMQQueue
delete();
//send deletion request to all other members:
- QueueDeleteBody request = new QueueDeleteBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
@@ -87,7 +89,9 @@ public class ClusteredQueue extends AMQQueue
super.unregisterProtocolSession(ps, channel, consumerTag);
//signal other members:
- BasicCancelBody request = new BasicCancelBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0);
request.consumerTag = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index a3af0fedc7..94f17cb9d3 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -56,7 +56,9 @@ public class PrivateQueue extends AMQQueue
super.autodelete();
//send delete request to peers:
- QueueDeleteBody request = new QueueDeleteBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
index f7fe5dc35a..ed18710c64 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -148,6 +148,9 @@ public class BrokerTest extends TestCase
TestMethod(Object id)
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ super((byte)8, (byte)0);
this.id = id;
}