summaryrefslogtreecommitdiff
path: root/java/cluster/test
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /java/cluster/test
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/test')
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java267
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java231
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java42
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java103
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java50
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java26
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java41
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java59
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java67
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java26
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java44
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/TestSession.java261
12 files changed, 1217 insertions, 0 deletions
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java
new file mode 100644
index 0000000000..015e96f9c6
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java
@@ -0,0 +1,267 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class BrokerGroupTest extends TestCase
+{
+ private final MemberHandle a = new SimpleMemberHandle("A", 1);
+ private final MemberHandle b = new SimpleMemberHandle("B", 1);
+ private final MemberHandle c = new SimpleMemberHandle("C", 1);
+ private final MemberHandle d = new SimpleMemberHandle("D", 1);
+
+ //join (new members perspective)
+ // (i) connectToLeader()
+ // ==> check state
+ // (ii) setMembers()
+ // ==> check state
+ // ==> check members
+ // (iii) synched(leader)
+ // ==> check state
+ // ==> check peers
+ // (iv) synched(other)
+ // ==> check state
+ // ==> check peers
+ // repeat for all others
+ public void testJoin_newMember() throws Exception
+ {
+ MemberHandle[] pre = new MemberHandle[]{a, b, c};
+ MemberHandle[] post = new MemberHandle[]{a, b, c};
+
+ BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.connectToLeader(a);
+ assertEquals(JoinState.JOINING, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ //(ii)
+ group.setMembers(Arrays.asList(post));
+ assertEquals(JoinState.INITIATION, group.getState());
+ assertEquals(Arrays.asList(post), group.getMembers());
+ //(iii) & (iv)
+ for (MemberHandle member : pre)
+ {
+ group.synched(member);
+ if (member == c)
+ {
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", pre.length, group.getPeers().size());
+ }
+ else
+ {
+ assertEquals(JoinState.INDUCTION, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ }
+ }
+ }
+
+ //join (leaders perspective)
+ // (i) extablish()
+ // ==> check state
+ // ==> check members
+ // ==> check peers
+ // (ii) connectToProspect()
+ // ==> check members
+ // ==> check peers
+ // repeat (ii)
+ public void testJoin_Leader() throws IOException, InterruptedException
+ {
+ MemberHandle[] prospects = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.establish();
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", 0, group.getPeers().size());
+ assertEquals("Wrong number of members", 1, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ //(ii)
+ for (int i = 0; i < prospects.length; i++)
+ {
+ group.connectToProspect(prospects[i]);
+ assertEquals("Wrong number of peers", i + 1, group.getPeers().size());
+ for (int j = 0; j <= i; j++)
+ {
+ assertTrue(prospects[i].matches(group.getPeers().get(i)));
+ }
+ assertEquals("Wrong number of members", i + 2, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ for (int j = 0; j <= i; j++)
+ {
+ assertEquals(prospects[i], group.getMembers().get(i + 1));
+ }
+ }
+ }
+
+ //join (general perspective)
+ // (i) set up group
+ // (ii) setMembers()
+ // ==> check members
+ // ==> check peers
+ public void testJoin_general() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] peers = new MemberHandle[]{a, b, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(peers.length, group.getPeers().size());
+ for (int i = 0; i < peers.length; i++)
+ {
+ assertTrue(peers[i].matches(group.getPeers().get(i)));
+ }
+ }
+
+ //leadership transfer (valid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_valid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view3 = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertTrue(result);
+ assertTrue(group.isLeader());
+ assertFalse(group.isLeader(a));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(c.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+
+ //leadership transfer (invalid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_invalid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertFalse(result);
+ assertFalse(group.isLeader());
+ assertTrue(group.isLeader(a));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(3, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(b.matches(group.getPeers().get(1)));
+ assertTrue(d.matches(group.getPeers().get(2)));
+
+ }
+
+ //leave (leaders perspective)
+ // (i) set up group
+ // (ii) remove a member
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_leader()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, d};
+ MemberHandle[] view3 = new MemberHandle[]{a, d};
+ MemberHandle[] view4 = new MemberHandle[]{a};
+ //(i)
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ group.establish();
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.remove(group.findBroker(c, false));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+
+ group.remove(group.findBroker(b, false));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+
+ group.remove(group.findBroker(d, false));
+ assertEquals(Arrays.asList(view4), group.getMembers());
+ }
+
+
+ //leave (general perspective)
+ // (i) set up group
+ // (ii) setMember
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_general()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, c, d};
+ //(i)
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ group.establish(); //not strictly the correct way to build up the group, but ok for here
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java
new file mode 100644
index 0000000000..d7ede7d3e0
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class BrokerTest extends TestCase
+{
+ //group request (no failure)
+ public void testGroupRequest_noFailure() throws AMQException
+ {
+ RecordingBroker[] brokers = new RecordingBroker[]{
+ new RecordingBroker("A", 1),
+ new RecordingBroker("B", 2),
+ new RecordingBroker("C", 3)
+ };
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ for (Broker b : brokers)
+ {
+ b.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker b : brokers)
+ {
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+ //group request (failure)
+ public void testGroupRequest_failure() throws AMQException
+ {
+ RecordingBroker a = new RecordingBroker("A", 1);
+ RecordingBroker b = new RecordingBroker("B", 2);
+ RecordingBroker c = new RecordingBroker("C", 3);
+ RecordingBroker[] all = new RecordingBroker[]{a, b, c};
+ RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
+
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+
+ for (Broker broker : all)
+ {
+ broker.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker broker : succeeded)
+ {
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+ b.remove();
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+
+ //simple send (no response)
+ public void testSend_noResponse() throws AMQException
+ {
+ AMQBody[] msgs = new AMQBody[]{
+ new TestMethod("A"),
+ new TestMethod("B"),
+ new TestMethod("C")
+ };
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ for (AMQBody msg : msgs)
+ {
+ broker.send(new SimpleSendable(msg), null);
+ }
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(msgs.length, sent.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertTrue(sent.get(i) instanceof AMQFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ }
+ }
+
+ //simple send (no failure)
+ public void testSend_noFailure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+
+ broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+
+ assertEquals(new TestMethod("B"), handler.getResponse());
+ }
+
+ //simple send (failure)
+ public void testSend_failure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ broker.remove();
+ assertEquals(null, handler.getResponse());
+ assertTrue(handler.isCompleted());
+ assertTrue(handler.failed());
+ }
+
+ private static class TestMethod extends AMQMethodBody
+ {
+ private final Object id;
+
+ TestMethod(Object id)
+ {
+ this.id = id;
+ }
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ protected int getClazz()
+ {
+ return 1002;
+ }
+
+ protected int getMethod()
+ {
+ return 1003;
+ }
+
+ protected void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ protected byte getType()
+ {
+ return 0;
+ }
+
+ protected int getSize()
+ {
+ return 0;
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestMethod && id.equals(((TestMethod) o).id);
+ }
+
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ }
+
+ private static class GroupResponseValidator implements GroupResponseHandler
+ {
+ private final AMQMethodBody _response;
+ private final List<Member> _members;
+ private boolean _completed = false;
+
+ GroupResponseValidator(AMQMethodBody response, List<Member> members)
+ {
+ _response = response;
+ _members = members;
+ }
+
+ public void response(List<AMQMethodBody> responses, List<Member> members)
+ {
+ for (AMQMethodBody r : responses)
+ {
+ assertEquals(_response, r);
+ }
+ assertEquals(_members, members);
+ _completed = true;
+ }
+
+ boolean isCompleted()
+ {
+ return _completed;
+ }
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
new file mode 100644
index 0000000000..132ebd8ca0
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class ClusterCapabilityTest
+{
+ @Test
+ public void startWithNull()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add(null, peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+
+ @Test
+ public void startWithText()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add("existing text", peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java
new file mode 100644
index 0000000000..fedf47d49a
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+public class InductionBufferTest extends TestCase
+{
+ public void test() throws Exception
+ {
+ IoSession session1 = new TestSession();
+ IoSession session2 = new TestSession();
+ IoSession session3 = new TestSession();
+
+ TestMessageHandler handler = new TestMessageHandler();
+ InductionBuffer buffer = new InductionBuffer(handler);
+
+ buffer.receive(session1, "one");
+ buffer.receive(session2, "two");
+ buffer.receive(session3, "three");
+
+ buffer.receive(session1, "four");
+ buffer.receive(session1, "five");
+ buffer.receive(session1, "six");
+
+ buffer.receive(session3, "seven");
+ buffer.receive(session3, "eight");
+
+ handler.checkEmpty();
+ buffer.deliver();
+
+ handler.check(session1, "one");
+ handler.check(session2, "two");
+ handler.check(session3, "three");
+
+ handler.check(session1, "four");
+ handler.check(session1, "five");
+ handler.check(session1, "six");
+
+ handler.check(session3, "seven");
+ handler.check(session3, "eight");
+ handler.checkEmpty();
+
+ buffer.receive(session1, "nine");
+ buffer.receive(session2, "ten");
+ buffer.receive(session3, "eleven");
+
+ handler.check(session1, "nine");
+ handler.check(session2, "ten");
+ handler.check(session3, "eleven");
+
+ handler.checkEmpty();
+ }
+
+ private static class TestMessageHandler implements InductionBuffer.MessageHandler
+ {
+ private final List<IoSession> _sessions = new ArrayList<IoSession>();
+ private final List<Object> _msgs = new ArrayList<Object>();
+
+ public synchronized void deliver(IoSession session, Object msg) throws Exception
+ {
+ _sessions.add(session);
+ _msgs.add(msg);
+ }
+
+ void check(IoSession actualSession, Object actualMsg)
+ {
+ assertFalse(_sessions.isEmpty());
+ assertFalse(_msgs.isEmpty());
+ IoSession expectedSession = _sessions.remove(0);
+ Object expectedMsg = _msgs.remove(0);
+ assertEquals(expectedSession, actualSession);
+ assertEquals(expectedMsg, actualMsg);
+ }
+
+ void checkEmpty()
+ {
+ assertTrue(_sessions.isEmpty());
+ assertTrue(_msgs.isEmpty());
+ }
+ }
+}
+
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java
new file mode 100644
index 0000000000..388d584288
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class RecordingBroker extends TestBroker
+{
+ private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>();
+
+ RecordingBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ _messages.add(data);
+ }
+
+ List<AMQDataBlock> getMessages()
+ {
+ return _messages;
+ }
+
+ void clear()
+ {
+ _messages.clear();
+ }
+
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
new file mode 100644
index 0000000000..e5e95323af
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class RecordingBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new RecordingBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java
new file mode 100644
index 0000000000..a4d13ea46d
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.junit.Test;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.JMSException;
+
+public class SimpleClusterTest
+{
+ @Test
+ public void declareExchange() throws AMQException, JMSException, URLSyntaxException
+ {
+ AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
+ AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ System.out.println("Session created");
+ session.declareExchange("my_exchange", "direct");
+ System.out.println("Exchange declared");
+ con.close();
+ System.out.println("Connection closed");
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
new file mode 100644
index 0000000000..f7c728759b
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class SimpleMemberHandleTest
+{
+ @Test
+ public void matches()
+ {
+ assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888));
+ }
+
+
+ @Test
+ public void resolve()
+ {
+ assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000"));
+ }
+
+ private void assertEquivalent(MemberHandle a, MemberHandle b)
+ {
+ String msg = a + " is not equivalent to " + b;
+ a = SimpleMemberHandle.resolve(a);
+ b = SimpleMemberHandle.resolve(b);
+ msg += "(" + a + " does not match " + b + ")";
+ assertTrue(msg, a.matches(b));
+ }
+
+ private void assertMatch(MemberHandle a, MemberHandle b)
+ {
+ assertTrue(a + " does not match " + b, a.matches(b));
+ }
+
+ private void assertNoMatch(MemberHandle a, MemberHandle b)
+ {
+ assertFalse(a + " matches " + b, a.matches(b));
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java
new file mode 100644
index 0000000000..c4a1985ae3
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.io.IOException;
+
+class TestBroker extends Broker
+{
+ TestBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ boolean connect() throws IOException, InterruptedException
+ {
+ return true;
+ }
+
+ void connectAsynch(Iterable<AMQMethodBody> msgs)
+ {
+ replay(msgs);
+ }
+
+ void replay(Iterable<AMQMethodBody> msgs)
+ {
+ try
+ {
+ for (AMQMethodBody b : msgs)
+ {
+ send(new AMQFrame(0, b));
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Broker connectToCluster() throws IOException, InterruptedException
+ {
+ return this;
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java
new file mode 100644
index 0000000000..cd4e340925
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class TestBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new TestBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java
new file mode 100644
index 0000000000..e2d6f75f19
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.replay.ReplayManager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class TestReplayManager implements ReplayManager
+{
+ private final List<AMQMethodBody> _msgs;
+
+ TestReplayManager()
+ {
+ this(new ArrayList<AMQMethodBody>());
+ }
+
+ TestReplayManager(List<AMQMethodBody> msgs)
+ {
+ _msgs = msgs;
+ }
+
+ public List<AMQMethodBody> replay(boolean isLeader)
+ {
+ return _msgs;
+ }
+}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
new file mode 100644
index 0000000000..c66fe27968
--- /dev/null
+++ b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.*;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+class TestSession implements IoSession
+{
+ public IoService getService()
+ {
+ return null; //TODO
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //TODO
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //TODO
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //TODO
+ }
+
+ public WriteFuture write(Object message)
+ {
+ return null; //TODO
+ }
+
+ public CloseFuture close()
+ {
+ return null; //TODO
+ }
+
+ public Object getAttachment()
+ {
+ return null; //TODO
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ return null; //TODO
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //TODO
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //TODO
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //TODO
+ }
+
+ public boolean isConnected()
+ {
+ return false; //TODO
+ }
+
+ public boolean isClosing()
+ {
+ return false; //TODO
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //TODO
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //TODO
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //TODO
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //TODO
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //TODO
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //TODO
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //TODO
+ }
+
+ public void suspendRead()
+ {
+ //TODO
+ }
+
+ public void suspendWrite()
+ {
+ //TODO
+ }
+
+ public void resumeRead()
+ {
+ //TODO
+ }
+
+ public void resumeWrite()
+ {
+ //TODO
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getReadMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //TODO
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //TODO
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+}