diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
| commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
| tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /java/cluster/test | |
| download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz | |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/test')
12 files changed, 1217 insertions, 0 deletions
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java new file mode 100644 index 0000000000..015e96f9c6 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/BrokerGroupTest.java @@ -0,0 +1,267 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class BrokerGroupTest extends TestCase +{ + private final MemberHandle a = new SimpleMemberHandle("A", 1); + private final MemberHandle b = new SimpleMemberHandle("B", 1); + private final MemberHandle c = new SimpleMemberHandle("C", 1); + private final MemberHandle d = new SimpleMemberHandle("D", 1); + + //join (new members perspective) + // (i) connectToLeader() + // ==> check state + // (ii) setMembers() + // ==> check state + // ==> check members + // (iii) synched(leader) + // ==> check state + // ==> check peers + // (iv) synched(other) + // ==> check state + // ==> check peers + // repeat for all others + public void testJoin_newMember() throws Exception + { + MemberHandle[] pre = new MemberHandle[]{a, b, c}; + MemberHandle[] post = new MemberHandle[]{a, b, c}; + + BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.connectToLeader(a); + assertEquals(JoinState.JOINING, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + //(ii) + group.setMembers(Arrays.asList(post)); + assertEquals(JoinState.INITIATION, group.getState()); + assertEquals(Arrays.asList(post), group.getMembers()); + //(iii) & (iv) + for (MemberHandle member : pre) + { + group.synched(member); + if (member == c) + { + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", pre.length, group.getPeers().size()); + } + else + { + assertEquals(JoinState.INDUCTION, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + } + } + } + + //join (leaders perspective) + // (i) extablish() + // ==> check state + // ==> check members + // ==> check peers + // (ii) connectToProspect() + // ==> check members + // ==> check peers + // repeat (ii) + public void testJoin_Leader() throws IOException, InterruptedException + { + MemberHandle[] prospects = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.establish(); + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", 0, group.getPeers().size()); + assertEquals("Wrong number of members", 1, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + //(ii) + for (int i = 0; i < prospects.length; i++) + { + group.connectToProspect(prospects[i]); + assertEquals("Wrong number of peers", i + 1, group.getPeers().size()); + for (int j = 0; j <= i; j++) + { + assertTrue(prospects[i].matches(group.getPeers().get(i))); + } + assertEquals("Wrong number of members", i + 2, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + for (int j = 0; j <= i; j++) + { + assertEquals(prospects[i], group.getMembers().get(i + 1)); + } + } + } + + //join (general perspective) + // (i) set up group + // (ii) setMembers() + // ==> check members + // ==> check peers + public void testJoin_general() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] peers = new MemberHandle[]{a, b, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(peers.length, group.getPeers().size()); + for (int i = 0; i < peers.length; i++) + { + assertTrue(peers[i].matches(group.getPeers().get(i))); + } + } + + //leadership transfer (valid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_valid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view3 = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertTrue(result); + assertTrue(group.isLeader()); + assertFalse(group.isLeader(a)); + assertEquals(Arrays.asList(view3), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(c.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } + + //leadership transfer (invalid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_invalid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertFalse(result); + assertFalse(group.isLeader()); + assertTrue(group.isLeader(a)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(3, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(b.matches(group.getPeers().get(1))); + assertTrue(d.matches(group.getPeers().get(2))); + + } + + //leave (leaders perspective) + // (i) set up group + // (ii) remove a member + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_leader() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, b, d}; + MemberHandle[] view3 = new MemberHandle[]{a, d}; + MemberHandle[] view4 = new MemberHandle[]{a}; + //(i) + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); + group.setMembers(Arrays.asList(view1)); + //(ii) + group.remove(group.findBroker(c, false)); + assertEquals(Arrays.asList(view2), group.getMembers()); + + group.remove(group.findBroker(b, false)); + assertEquals(Arrays.asList(view3), group.getMembers()); + + group.remove(group.findBroker(d, false)); + assertEquals(Arrays.asList(view4), group.getMembers()); + } + + + //leave (general perspective) + // (i) set up group + // (ii) setMember + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_general() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, c, d}; + //(i) + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); //not strictly the correct way to build up the group, but ok for here + group.setMembers(Arrays.asList(view1)); + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java new file mode 100644 index 0000000000..d7ede7d3e0 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/BrokerTest.java @@ -0,0 +1,231 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.policy.StandardPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class BrokerTest extends TestCase +{ + //group request (no failure) + public void testGroupRequest_noFailure() throws AMQException + { + RecordingBroker[] brokers = new RecordingBroker[]{ + new RecordingBroker("A", 1), + new RecordingBroker("B", 2), + new RecordingBroker("C", 3) + }; + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + for (Broker b : brokers) + { + b.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker b : brokers) + { + b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response")); + } + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + //group request (failure) + public void testGroupRequest_failure() throws AMQException + { + RecordingBroker a = new RecordingBroker("A", 1); + RecordingBroker b = new RecordingBroker("B", 2); + RecordingBroker c = new RecordingBroker("C", 3); + RecordingBroker[] all = new RecordingBroker[]{a, b, c}; + RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; + + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + + for (Broker broker : all) + { + broker.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker broker : succeeded) + { + broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response")); + } + b.remove(); + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + + //simple send (no response) + public void testSend_noResponse() throws AMQException + { + AMQBody[] msgs = new AMQBody[]{ + new TestMethod("A"), + new TestMethod("B"), + new TestMethod("C") + }; + RecordingBroker broker = new RecordingBroker("myhost", 1); + for (AMQBody msg : msgs) + { + broker.send(new SimpleSendable(msg), null); + } + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(msgs.length, sent.size()); + for (int i = 0; i < msgs.length; i++) + { + assertTrue(sent.get(i) instanceof AMQFrame); + assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame); + } + } + + //simple send (no failure) + public void testSend_noFailure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + + broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B")); + + assertEquals(new TestMethod("B"), handler.getResponse()); + } + + //simple send (failure) + public void testSend_failure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + broker.remove(); + assertEquals(null, handler.getResponse()); + assertTrue(handler.isCompleted()); + assertTrue(handler.failed()); + } + + private static class TestMethod extends AMQMethodBody + { + private final Object id; + + TestMethod(Object id) + { + this.id = id; + } + + protected int getBodySize() + { + return 0; + } + + protected int getClazz() + { + return 1002; + } + + protected int getMethod() + { + return 1003; + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + } + + protected byte getType() + { + return 0; + } + + protected int getSize() + { + return 0; + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + } + + public boolean equals(Object o) + { + return o instanceof TestMethod && id.equals(((TestMethod) o).id); + } + + public int hashCode() + { + return id.hashCode(); + } + + } + + private static class GroupResponseValidator implements GroupResponseHandler + { + private final AMQMethodBody _response; + private final List<Member> _members; + private boolean _completed = false; + + GroupResponseValidator(AMQMethodBody response, List<Member> members) + { + _response = response; + _members = members; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + for (AMQMethodBody r : responses) + { + assertEquals(_response, r); + } + assertEquals(_members, members); + _completed = true; + } + + boolean isCompleted() + { + return _completed; + } + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java new file mode 100644 index 0000000000..132ebd8ca0 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/ClusterCapabilityTest.java @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class ClusterCapabilityTest +{ + @Test + public void startWithNull() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add(null, peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } + + @Test + public void startWithText() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add("existing text", peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java new file mode 100644 index 0000000000..fedf47d49a --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/InductionBufferTest.java @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.List; +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class InductionBufferTest extends TestCase +{ + public void test() throws Exception + { + IoSession session1 = new TestSession(); + IoSession session2 = new TestSession(); + IoSession session3 = new TestSession(); + + TestMessageHandler handler = new TestMessageHandler(); + InductionBuffer buffer = new InductionBuffer(handler); + + buffer.receive(session1, "one"); + buffer.receive(session2, "two"); + buffer.receive(session3, "three"); + + buffer.receive(session1, "four"); + buffer.receive(session1, "five"); + buffer.receive(session1, "six"); + + buffer.receive(session3, "seven"); + buffer.receive(session3, "eight"); + + handler.checkEmpty(); + buffer.deliver(); + + handler.check(session1, "one"); + handler.check(session2, "two"); + handler.check(session3, "three"); + + handler.check(session1, "four"); + handler.check(session1, "five"); + handler.check(session1, "six"); + + handler.check(session3, "seven"); + handler.check(session3, "eight"); + handler.checkEmpty(); + + buffer.receive(session1, "nine"); + buffer.receive(session2, "ten"); + buffer.receive(session3, "eleven"); + + handler.check(session1, "nine"); + handler.check(session2, "ten"); + handler.check(session3, "eleven"); + + handler.checkEmpty(); + } + + private static class TestMessageHandler implements InductionBuffer.MessageHandler + { + private final List<IoSession> _sessions = new ArrayList<IoSession>(); + private final List<Object> _msgs = new ArrayList<Object>(); + + public synchronized void deliver(IoSession session, Object msg) throws Exception + { + _sessions.add(session); + _msgs.add(msg); + } + + void check(IoSession actualSession, Object actualMsg) + { + assertFalse(_sessions.isEmpty()); + assertFalse(_msgs.isEmpty()); + IoSession expectedSession = _sessions.remove(0); + Object expectedMsg = _msgs.remove(0); + assertEquals(expectedSession, actualSession); + assertEquals(expectedMsg, actualMsg); + } + + void checkEmpty() + { + assertTrue(_sessions.isEmpty()); + assertTrue(_msgs.isEmpty()); + } + } +} + diff --git a/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java new file mode 100644 index 0000000000..388d584288 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBroker.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +import java.util.ArrayList; +import java.util.List; + +class RecordingBroker extends TestBroker +{ + private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>(); + + RecordingBroker(String host, int port) + { + super(host, port); + } + + public void send(AMQDataBlock data) throws AMQException + { + _messages.add(data); + } + + List<AMQDataBlock> getMessages() + { + return _messages; + } + + void clear() + { + _messages.clear(); + } + +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java new file mode 100644 index 0000000000..e5e95323af --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/RecordingBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class RecordingBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new RecordingBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java new file mode 100644 index 0000000000..a4d13ea46d --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.junit.Test; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; + +import javax.jms.JMSException; + +public class SimpleClusterTest +{ + @Test + public void declareExchange() throws AMQException, JMSException, URLSyntaxException + { + AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); + AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + System.out.println("Session created"); + session.declareExchange("my_exchange", "direct"); + System.out.println("Exchange declared"); + con.close(); + System.out.println("Connection closed"); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java new file mode 100644 index 0000000000..f7c728759b --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java @@ -0,0 +1,59 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class SimpleMemberHandleTest +{ + @Test + public void matches() + { + assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888)); + } + + + @Test + public void resolve() + { + assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000")); + } + + private void assertEquivalent(MemberHandle a, MemberHandle b) + { + String msg = a + " is not equivalent to " + b; + a = SimpleMemberHandle.resolve(a); + b = SimpleMemberHandle.resolve(b); + msg += "(" + a + " does not match " + b + ")"; + assertTrue(msg, a.matches(b)); + } + + private void assertMatch(MemberHandle a, MemberHandle b) + { + assertTrue(a + " does not match " + b, a.matches(b)); + } + + private void assertNoMatch(MemberHandle a, MemberHandle b) + { + assertFalse(a + " matches " + b, a.matches(b)); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java new file mode 100644 index 0000000000..c4a1985ae3 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/TestBroker.java @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; + +class TestBroker extends Broker +{ + TestBroker(String host, int port) + { + super(host, port); + } + + boolean connect() throws IOException, InterruptedException + { + return true; + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + replay(msgs); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + try + { + for (AMQMethodBody b : msgs) + { + send(new AMQFrame(0, b)); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + Broker connectToCluster() throws IOException, InterruptedException + { + return this; + } + + public void send(AMQDataBlock data) throws AMQException + { + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java new file mode 100644 index 0000000000..cd4e340925 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/TestBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class TestBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new TestBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java new file mode 100644 index 0000000000..e2d6f75f19 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/TestReplayManager.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.replay.ReplayManager; + +import java.util.ArrayList; +import java.util.List; + +class TestReplayManager implements ReplayManager +{ + private final List<AMQMethodBody> _msgs; + + TestReplayManager() + { + this(new ArrayList<AMQMethodBody>()); + } + + TestReplayManager(List<AMQMethodBody> msgs) + { + _msgs = msgs; + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + return _msgs; + } +} diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java new file mode 100644 index 0000000000..c66fe27968 --- /dev/null +++ b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java @@ -0,0 +1,261 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +class TestSession implements IoSession +{ + public IoService getService() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} |
