diff options
| author | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
| commit | 0a0baee45ebcff44635907d457c4ff6810b09c87 (patch) | |
| tree | 8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/bdbstore/src/test | |
| parent | 54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff) | |
| download | qpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz | |
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
31 files changed, 0 insertions, 6175 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java deleted file mode 100644 index 67804dc234..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import junit.framework.TestCase; - -import org.apache.qpid.framing.AMQShortString; - -/** - * Tests for {@code AMQShortStringEncoding} including corner cases when string - * is null or over 127 characters in length - */ -public class AMQShortStringEncodingTest extends TestCase -{ - - public void testWriteReadNullValues() - { - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(null, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertNull("Expected null but got " + result, result); - } - - public void testWriteReadShortStringWithLengthOver127() - { - AMQShortString value = createString('a', 128); - - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(value, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertEquals("Expected " + value + " but got " + result, value, result); - } - - public void testWriteReadShortStringWithLengthLess127() - { - AMQShortString value = new AMQShortString("test"); - - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(value, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertEquals("Expected " + value + " but got " + result, value, result); - } - - private AMQShortString createString(char ch, int length) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < length; i++) - { - sb.append(ch); - } - return new AMQShortString(sb.toString()); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java deleted file mode 100644 index ee990d3211..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ /dev/null @@ -1,795 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.when; - -import java.io.File; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import com.sleepycat.je.Durability; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.model.AbstractConfiguredObject; -import org.apache.qpid.server.model.ConfigurationChangeListener; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.RemoteReplicationNode; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeTestHelper; -import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole; -import org.apache.qpid.test.utils.PortHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; -import org.apache.qpid.util.FileUtils; - -public class BDBHAVirtualHostNodeTest extends QpidTestCase -{ - private BDBHAVirtualHostNodeTestHelper _helper; - private PortHelper _portHelper = new PortHelper(); - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - _helper = new BDBHAVirtualHostNodeTestHelper(getTestName()); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _helper.tearDown(); - } - finally - { - super.tearDown(); - } - - _portHelper.waitUntilAllocatedPortsAreFree(); - } - - public void testCreateAndActivateVirtualHostNode() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - String messageStorePath = (String)attributes.get(BDBHAVirtualHostNode.STORE_PATH); - String repStreamTimeout = "2 h"; - Map<String,String> context = (Map<String,String>)attributes.get(BDBHAVirtualHostNode.CONTEXT); - context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout); - BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes); - - node.start(); - _helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA); - - assertEquals("Unexpected node state", State.ACTIVE, node.getState()); - - DurableConfigurationStore store = node.getConfigurationStore(); - assertNotNull(store); - - BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store; - ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment(); - ReplicationConfig replicationConfig = environment.getRepConfig(); - - assertEquals(nodeName, environment.getNodeName()); - assertEquals(groupName, environment.getGroup().getName()); - assertEquals(helperAddress, replicationConfig.getNodeHostPort()); - assertEquals(helperAddress, replicationConfig.getHelperHosts()); - - assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); - assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); - - _helper.awaitForVirtualhost(node, 30000); - VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); - assertNotNull("Virtual host child was not added", virtualHost); - assertEquals("Unexpected virtual host name", groupName, virtualHost.getName()); - assertEquals("Unexpected virtual host store", bdbConfigurationStore.getMessageStore(), virtualHost.getMessageStore()); - assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState()); - - node.stop(); - assertEquals("Unexpected state returned after stop", State.STOPPED, node.getState()); - assertEquals("Unexpected state", State.STOPPED, node.getState()); - - assertNull("Virtual host is not destroyed", node.getVirtualHost()); - - node.delete(); - assertEquals("Unexpected state returned after delete", State.DELETED, node.getState()); - assertEquals("Unexpected state", State.DELETED, node.getState()); - assertFalse("Store still exists " + messageStorePath, new File(messageStorePath).exists()); - } - - public void testMutableAttributes() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(attributes); - - BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore(); - ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment(); - - assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority()); - assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary()); - assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride()); - - node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2); - node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true); - node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1); - - assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority()); - assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary()); - assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride()); - - assertNotNull("Join time should be set", node.getJoinTime()); - assertNotNull("Last known replication transaction id should be set", node.getLastKnownReplicationTransactionId()); - } - - public void testTransferMasterToSelf() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber); - _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - _helper.createAndStartHaVHN(node2Attributes); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName); - _helper.createAndStartHaVHN(node3Attributes); - - BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA); - - replica.setAttribute(BDBHAVirtualHostNode.ROLE, replica.getRole(), NodeRole.MASTER); - - _helper.assertNodeRole(replica, NodeRole.MASTER); - } - - public void testTransferMasterToRemoteReplica() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, - helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - final AtomicReference<RemoteReplicationNode<?>> lastSeenReplica = new AtomicReference<>(); - final CountDownLatch remoteNodeLatch = new CountDownLatch(2); - node1.addChangeListener(new NoopConfigurationChangeListener() - { - @Override - public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - if (child instanceof RemoteReplicationNode) - { - remoteNodeLatch.countDown(); - lastSeenReplica.set((RemoteReplicationNode<?>)child); - } - } - }); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes); - - assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS)); - - BDBHARemoteReplicationNodeImpl replicaRemoteNode = (BDBHARemoteReplicationNodeImpl)lastSeenReplica.get(); - _helper.awaitForAttributeChange(replicaRemoteNode, BDBHARemoteReplicationNodeImpl.ROLE, NodeRole.REPLICA); - - replicaRemoteNode.setAttributes(Collections.<String,Object>singletonMap(BDBHARemoteReplicationNode.ROLE, NodeRole.MASTER)); - - BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3; - _helper.assertNodeRole(replica, NodeRole.MASTER); - } - - public void testMutatingRoleWhenNotReplica_IsDisallowed() throws Exception - { - int nodePortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + nodePortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber); - BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes); - _helper.assertNodeRole(node, NodeRole.MASTER); - - try - { - node.setAttributes(Collections.<String,Object>singletonMap(BDBHAVirtualHostNode.ROLE, NodeRole.REPLICA)); - fail("Role mutation should fail"); - } - catch(IllegalStateException e) - { - // PASS - } - } - - - public void testRemoveReplicaNode() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - assertTrue(_portHelper.isPortAvailable(node1PortNumber)); - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber); - _helper.createAndStartHaVHN(node1Attributes); - - assertTrue(_portHelper.isPortAvailable(node2PortNumber)); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - _helper.createAndStartHaVHN(node2Attributes); - - assertTrue(_portHelper.isPortAvailable(node3PortNumber)); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName); - _helper.createAndStartHaVHN(node3Attributes); - - - BDBHAVirtualHostNode<?> master = _helper.awaitAndFindNodeInRole(NodeRole.MASTER); - _helper.awaitRemoteNodes(master, 2); - - BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA); - _helper.awaitRemoteNodes(replica, 2); - - assertNotNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName())); - replica.delete(); - - _helper.awaitRemoteNodes(master, 1); - - assertNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName())); - } - - public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNode<?> node = _helper.createHaVHN(nodeAttributes); - - node.start(); - _helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA); - assertEquals("Unexpected node state", State.ACTIVE, node.getState()); - - _helper.awaitForVirtualhost(node,30000); - BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); - assertNotNull("Virtual host is not created", virtualHost); - - _helper.awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true); - - assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); - assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync()); - - Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); - virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "WRITE_NO_SYNC"); - virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); - virtualHost.setAttributes(virtualHostAttributes); - - virtualHost.stop(); - virtualHost.start(); - - assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); - assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync()); - try - { - virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID")); - fail("Invalid synchronization policy is set"); - } - catch(IllegalArgumentException e) - { - //pass - } - - try - { - virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID")); - fail("Invalid synchronization policy is set"); - } - catch(IllegalArgumentException e) - { - //pass - } - - } - - public void testNotPermittedNodeIsNotAllowedToConnect() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName); - try - { - _helper.createHaVHN(node3Attributes); - fail("The VHN should not be permitted to join the group"); - } - catch(IllegalConfigurationException e) - { - assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage()); - } - } - - public void testCurrentNodeCannotBeRemovedFromPermittedNodeList() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - - String node1Address = "localhost:" + node1PortNumber; - String node2Address = "localhost:" + node2PortNumber; - String node3Address = "localhost:" + node3PortNumber; - - String groupName = "group"; - String node1Name = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name); - BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name); - BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes); - - _helper.awaitRemoteNodes(node1, 2); - - // Create new "proposed" permitted nodes list with a current node missing - List<String> amendedPermittedNodes = new ArrayList<String>(); - amendedPermittedNodes.add(node1Address); - amendedPermittedNodes.add(node2Address); - - // Try to update the permitted nodes attributes using the new list - try - { - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); - fail("Operation to remove current group node from permitted nodes should have failed"); - } - catch(IllegalArgumentException e) - { - assertEquals("Unexpected exception message", String.format("The current group node '%s' cannot be removed from '%s' as its already a group member", node3Address, BDBHAVirtualHostNode.PERMITTED_NODES), e.getMessage()); - } - } - - public void testPermittedNodesAttributeModificationConditions() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - int node3PortNumber = _portHelper.getNextAvailable(); - int node4PortNumber = _portHelper.getNextAvailable(); - int node5PortNumber = _portHelper.getNextAvailable(); - - String node1Address = "localhost:" + node1PortNumber; - String node2Address = "localhost:" + node2PortNumber; - String node3Address = "localhost:" + node3PortNumber; - String node4Address = "localhost:" + node4PortNumber; - String node5Address = "localhost:" + node5PortNumber; - - String groupName = "group"; - String node1Name = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name); - BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes); - - Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name); - BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes); - - _helper.awaitRemoteNodes(node1, 2); - - // Create new "proposed" permitted nodes list for update - List<String> amendedPermittedNodes = new ArrayList<String>(); - amendedPermittedNodes.add(node1Address); - amendedPermittedNodes.add(node2Address); - amendedPermittedNodes.add(node3Address); - amendedPermittedNodes.add(node4Address); - - // Try to update the permitted nodes attributes using the new list on REPLICA - should fail - BDBHAVirtualHostNode<?> nonMasterNode = _helper.findNodeInRole(NodeRole.REPLICA); - try - { - nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); - fail("Operation to update permitted nodes should have failed from non MASTER node"); - } - catch(IllegalArgumentException e) - { - assertEquals("Unexpected exception message", String.format("Attribute '%s' can only be set on '%s' node or node in '%s' or '%s' state", BDBHAVirtualHostNode.PERMITTED_NODES, NodeRole.MASTER, State.STOPPED, State.ERRORED), e.getMessage()); - } - - // Try to update the permitted nodes attributes using the new list on MASTER - should succeed - BDBHAVirtualHostNode<?> masterNode = _helper.findNodeInRole(NodeRole.MASTER); - masterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); - - // Try to update the permitted nodes attributes using the new list on a STOPPED node - should succeed - nonMasterNode.stop(); - amendedPermittedNodes.add(node5Address); - nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); - } - - public void testIntruderProtection() throws Exception - { - int nodePortNumber = _portHelper.getNextAvailable(); - int intruderPortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + nodePortNumber; - String groupName = "group"; - String nodeName = "node"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber, intruderPortNumber); - BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> intruderAttributes = _helper.createNodeAttributes("intruder", groupName, "localhost:" + intruderPortNumber, helperAddress, nodeName); - intruderAttributes.put(BDBHAVirtualHostNode.PRIORITY, 0); - BDBHAVirtualHostNode<?> intruder = _helper.createAndStartHaVHN(intruderAttributes); - - final CountDownLatch stopLatch = new CountDownLatch(1); - ConfigurationChangeListener listener = new NoopConfigurationChangeListener() - { - @Override - public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) - { - if (newState == State.ERRORED) - { - stopLatch.countDown(); - } - } - }; - node.addChangeListener(listener); - - List<String> permittedNodes = new ArrayList<String>(); - permittedNodes.add(helperAddress); - node.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes)); - - assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS)); - - // Try top re start the ERRORED node and ensure exception is thrown - try - { - node.start(); - fail("Restart of node should have thrown exception"); - } - catch (IllegalStateException ise) - { - assertEquals("Unexpected exception when restarting node post intruder detection", "Intruder node detected: " + "localhost:" + intruderPortNumber, ise.getMessage()); - } - _helper.awaitForAttributeChange(node, AbstractConfiguredObject.STATE, State.ERRORED); - } - - public void testIntruderProtectionInManagementMode() throws Exception - { - int nodePortNumber = _portHelper.getNextAvailable(); - int intruderPortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + nodePortNumber; - String groupName = "group"; - String nodeName = "node"; - - Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber, intruderPortNumber); - BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(nodeAttributes); - - Map<String, Object> intruderAttributes = _helper.createNodeAttributes("intruder", groupName, "localhost:" + intruderPortNumber, helperAddress, nodeName); - intruderAttributes.put(BDBHAVirtualHostNode.PRIORITY, 0); - BDBHAVirtualHostNode<?> intruder = _helper.createAndStartHaVHN(intruderAttributes); - - final CountDownLatch stopLatch = new CountDownLatch(1); - ConfigurationChangeListener listener = new NoopConfigurationChangeListener() - { - @Override - public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) - { - if (newState == State.ERRORED) - { - stopLatch.countDown(); - } - } - }; - node.addChangeListener(listener); - - List<String> permittedNodes = new ArrayList<String>(); - permittedNodes.add(helperAddress); - node.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes)); - - assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS)); - - // test that if management mode is enabled then the node can start without exception - when(_helper.getBroker().isManagementMode()).thenReturn(true); - node.start(); - - _helper.awaitForAttributeChange(node, AbstractConfiguredObject.STATE, State.ERRORED); - } - - public void testPermittedNodesChangedOnReplicaNodeOnlyOnceAfterBeingChangedOnMaster() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0); - BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes); - assertEquals("Unexpected role", NodeRole.REPLICA, node2.getRole()); - _helper.awaitRemoteNodes(node2, 1); - - BDBHARemoteReplicationNode<?> remote = _helper.findRemoteNode(node2, node1.getName()); - - final AtomicInteger permittedNodesChangeCounter = new AtomicInteger(); - final CountDownLatch _permittedNodesLatch = new CountDownLatch(1); - node2.addChangeListener(new NoopConfigurationChangeListener() - { - @Override - public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue) - { - if (attributeName.equals(BDBHAVirtualHostNode.PERMITTED_NODES)) - { - permittedNodesChangeCounter.incrementAndGet(); - _permittedNodesLatch.countDown(); - } - } - }); - List<String> permittedNodes = new ArrayList<>(node1.getPermittedNodes()); - permittedNodes.add("localhost:5000"); - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes)); - - assertTrue("Permitted nodes were not changed on Replica", _permittedNodesLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Not the same permitted nodes", new HashSet<>(node1.getPermittedNodes()), new HashSet<>(node2.getPermittedNodes())); - assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get()); - - // change the order of permitted nodes - Collections.swap(permittedNodes, 0, 2); - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes)); - - // make sure that node2 onNodeState was invoked by performing transaction on master and making sure that it was replicated - performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote); - - // perform transaction second time because permitted nodes are changed after last transaction id - performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote); - assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get()); - } - - private void performTransactionAndAwaitForRemoteNodeToGetAware(BDBHAVirtualHostNode<?> node1, BDBHARemoteReplicationNode<?> remote) throws InterruptedException - { - new DatabasePinger().pingDb(((BDBConfigurationStore)node1.getConfigurationStore()).getEnvironmentFacade()); - - int waitCounter = 100; - while ( remote.getLastKnownReplicationTransactionId() != node1.getLastKnownReplicationTransactionId() && (waitCounter--) != 0) - { - Thread.sleep(100l); - } - assertEquals("Last transaction was not replicated", new Long(remote.getLastKnownReplicationTransactionId()), node1.getLastKnownReplicationTransactionId() ); - } - - public void testIntruderConnected() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes); - - final CountDownLatch stopLatch = new CountDownLatch(1); - ConfigurationChangeListener listener = new NoopConfigurationChangeListener() - { - @Override - public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) - { - if (newState == State.ERRORED) - { - stopLatch.countDown(); - } - } - }; - node1.addChangeListener(listener); - - String node2Name = "node2"; - File environmentPathFile = new File(_helper.getMessageStorePath() + File.separator + node2Name); - environmentPathFile.mkdirs(); - - ReplicationConfig replicationConfig = new ReplicationConfig(groupName, node2Name, "localhost:" + node2PortNumber ); - replicationConfig.setHelperHosts(helperAddress); - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setDurability(Durability.parse((String) node1Attributes.get(BDBHAVirtualHostNode.DURABILITY))); - - ReplicatedEnvironment intruder = null; - String originalThreadName = Thread.currentThread().getName(); - try - { - intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); - } - finally - { - try - { - if (intruder != null) - { - intruder.close(); - } - } - finally - { - Thread.currentThread().setName(originalThreadName); - } - } - - assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS)); - } - - public void testValidateOnCreateForNonExistingHelperNode() throws Exception - { - int node1PortNumber = findFreePort(); - int node2PortNumber = getNextAvailable(node1PortNumber + 1); - - - Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber, - "localhost:" + node2PortNumber, "node2", node1PortNumber, node1PortNumber, node2PortNumber); - try - { - _helper.createAndStartHaVHN(attributes); - fail("Node creation should fail because of invalid helper address"); - } - catch(IllegalConfigurationException e) - { - assertEquals("Unexpected exception on connection to non-existing helper address", - String.format("Cannot connect to existing node '%s' at '%s'", "node2", "localhost:" + node2PortNumber), e.getMessage()); - } - } - - public void testValidateOnCreateForAlreadyBoundAddress() throws Exception - { - int node1PortNumber = findFreePort(); - - ServerSocket serverSocket = null; - try - { - serverSocket = new ServerSocket(); - serverSocket.setReuseAddress(true); - serverSocket.bind(new InetSocketAddress("localhost", node1PortNumber)); - - - Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber, - "localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber); - try - { - _helper.createAndStartHaVHN(attributes); - fail("Node creation should fail because of invalid address"); - } - catch(IllegalConfigurationException e) - { - assertEquals("Unexpected exception on attempt to create node with already bound address", - String.format("Cannot bind to address '%s'. Address is already in use.", "localhost:" + node1PortNumber), e.getMessage()); - } - } - finally - { - if (serverSocket != null) - { - serverSocket.close(); - } - } - } - - public void testValidateOnCreateForInvalidStorePath() throws Exception - { - int node1PortNumber = findFreePort(); - - File storeBaseFolder = TestFileUtils.createTestDirectory(); - File file = new File(storeBaseFolder, getTestName()); - file.createNewFile(); - File storePath = new File(file, "test"); - try - { - Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber, - "localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber); - attributes.put(BDBHAVirtualHostNode.STORE_PATH, storePath.getAbsoluteFile()); - try - { - _helper.createAndStartHaVHN(attributes); - fail("Node creation should fail because of invalid store path"); - } - catch (IllegalConfigurationException e) - { - assertEquals("Unexpected exception on attempt to create environment in invalid location", - String.format("Store path '%s' is not a folder", storePath.getAbsoluteFile()), e.getMessage()); - } - } - finally - { - FileUtils.delete(storeBaseFolder, true); - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java deleted file mode 100644 index a58b1e7c2e..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; - -public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase -{ - @Override - protected VirtualHostNode createVirtualHostNode(String storeLocation, ConfiguredObjectFactory factory) - { - final BDBVirtualHostNode parent = mock(BDBVirtualHostNode.class); - when(parent.getStorePath()).thenReturn(storeLocation); - return parent; - } - - @Override - protected DurableConfigurationStore createConfigStore() throws Exception - { - return new BDBConfigurationStore(VirtualHost.class); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java deleted file mode 100644 index ee0edb5e59..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.Map; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; - -public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase -{ - /* - * Notes on calculation of quota limits. - * - * 150 32kb messages is approximately 4.8MB which is greater than - * OVERFULL_SIZE. - * - * We deliberately use settings that force BDB to use multiple log files, so - * that when one or more of them are subsequently cleaned (following message - * consumption) the actual size on disk is reduced. - */ - - private static final String MAX_BDB_LOG_SIZE = "1000000"; // ~1MB - - private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 150; - - private static final long OVERFULL_SIZE = 4000000; // ~4MB - private static final long UNDERFULL_SIZE = 3500000; // ~3.5MB - - @Override - protected int getNumberOfMessagesToFillStore() - { - return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; - } - - @Override - protected VirtualHost createVirtualHost(String storeLocation) - { - final BDBVirtualHost parent = mock(BDBVirtualHost.class); - Map<String, String> contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(parent.getContext()).thenReturn(contextMap); - when(parent.getContextKeys(false)).thenReturn(contextMap.keySet()); - when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE); - when(parent.getStorePath()).thenReturn(storeLocation); - when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE); - when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE); - return parent; - } - - - @Override - protected MessageStore createStore() throws Exception - { - MessageStore store = new BDBMessageStore(); - return store; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java deleted file mode 100644 index 3f8c1a7a99..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MessagePublishInfo; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; -import org.apache.qpid.server.store.MessageHandle; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.util.FileUtils; - -/** - * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the BDB store-implementation. - */ -public class BDBMessageStoreTest extends MessageStoreTestCase -{ - private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - - private String _storeLocation; - - @Override - protected void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - deleteStoreIfExists(); - } - } - - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore bdbStore = getStore(); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - - ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - - messageHandle_0_8.addContent(firstContentBytes_0_8); - messageHandle_0_8.addContent(secondContentBytes_0_8); - final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - - messageHandle_0_10.addContent(completeContentBody_0_10); - final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore(); - - /* - * reload the store only (read-only) - */ - reopenStore(); - - /* - * Read back and validate the 0-8 message metadata and content - */ - BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore(); - StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); - assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); - assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - - BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; - long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - - reopenedBdbStore.closeMessageStore(); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - - private MessagePublishInfo createPublishInfoBody_0_8() - { - return new MessagePublishInfo(new AMQShortString("exchange12345"), false, true, - new AMQShortString("routingKey12345")); - - } - - private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) - { - return new ContentHeaderBody(props, length); - } - - private BasicContentHeaderProperties createContentHeaderProperties_0_8() - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/html"); - props.getHeaders().setString("Test", "MST"); - return props; - } - - public void testGetContentWithOffset() throws Exception - { - BDBMessageStore bdbStore = (BDBMessageStore) getStore(); - StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - // normal case: offset is 0 - ByteBuffer dst = ByteBuffer.allocate(10); - int length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", CONTENT_BYTES.length, length); - byte[] array = dst.array(); - assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); - - // offset is in the middle - dst = ByteBuffer.allocate(10); - length = bdbStore.getContent(messageid_0_8, 5, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - byte[] expected = new byte[10]; - System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // offset beyond the content length - dst = ByteBuffer.allocate(10); - try - { - bdbStore.getContent(messageid_0_8, 15, dst); - fail("Should fail for the offset greater than message size"); - } - catch (RuntimeException e) - { - assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getCause().getMessage()); - } - - // buffer is smaller then message size - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // buffer is smaller then message size, offset is not 0 - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 2, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - } - - /** - * Tests that messages which are added to the store and then removed using the - * public MessageStore interfaces are actually removed from the store by then - * interrogating the store with its own implementation methods and verifying - * expected exceptions are thrown to indicate the message is not present. - */ - public void testMessageCreationAndRemoval() throws Exception - { - BDBMessageStore bdbStore = (BDBMessageStore) getStore(); - - StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - bdbStore.removeMessage(messageid_0_8, true); - - //verify the removal using the BDB store implementation methods directly - try - { - // the next line should throw since the message id should not be found - bdbStore.getMessageMetaData(messageid_0_8); - fail("No exception thrown when message id not found getting metadata"); - } - catch (StoreException e) - { - // pass since exception expected - } - - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); - } - - private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) - { - ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); - - int bodySize = CONTENT_BYTES.length; - - //create and store the message using the MessageStore interface - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - - storedMessage_0_8.addContent(chunk1); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); - - return storedMessage_0_8.allContentAdded(); - } - - public void testOnDelete() throws Exception - { - String storeLocation = getStore().getStoreLocation(); - - File location = new File(storeLocation); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - getStore().closeMessageStore(); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - BDBVirtualHost mockVH = mock(BDBVirtualHost.class); - String testLocation = getStore().getStoreLocation(); - when(mockVH.getStorePath()).thenReturn(testLocation); - - getStore().onDelete(mockVH); - - assertFalse("Store exists at " + storeLocation, location.exists()); - } - - - @Override - protected VirtualHost createVirtualHost() - { - _storeLocation = TMP_FOLDER + File.separator + getTestName(); - deleteStoreIfExists(); - - final BDBVirtualHost parent = mock(BDBVirtualHost.class); - when(parent.getStorePath()).thenReturn(_storeLocation); - return parent; - } - - private void deleteStoreIfExists() - { - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - FileUtils.delete(location, true); - } - } - } - - @Override - protected MessageStore createMessageStore() - { - return new BDBMessageStore(); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java deleted file mode 100644 index 4b5069fec8..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.MBeanServerConnection; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.url.URLSyntaxException; - -/** - * Prepares an older version brokers BDB store with the required - * contents for use in the BDBStoreUpgradeTest. - * - * NOTE: Must be used with the equivalent older version client! - * - * The store will then be used to verify that the upgraded is - * completed properly and that once upgraded it functions as - * expected with the new broker. - * - */ -public class BDBStoreUpgradeTestPreparer -{ - private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgradeTestPreparer.class); - - public static final String TOPIC_NAME="myUpgradeTopic"; - public static final String SUB_NAME="myDurSubName"; - public static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; - public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; - public static final String QUEUE_NAME="myUpgradeQueue"; - public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; - - public static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; - public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; - public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; - public static final String MISUSED_OWNER = "misused-owner-as-description"; - private static final String VIRTUAL_HOST_NAME = "test"; - private static final String SORTED_QUEUE_NAME = "mySortedQueue"; - private static final String SORT_KEY = "mySortKey"; - private static final String TEST_EXCHANGE_NAME = "myCustomExchange"; - private static final String TEST_QUEUE_NAME = "myCustomQueue"; - - private static AMQConnectionFactory _connFac; - private static final String CONN_URL = "amqp://guest:guest@clientid/" + VIRTUAL_HOST_NAME + "?brokerlist='tcp://localhost:5672'"; - - /** - * Create a BDBStoreUpgradeTestPreparer instance - */ - public BDBStoreUpgradeTestPreparer () throws URLSyntaxException - { - _connFac = new AMQConnectionFactory(CONN_URL); - } - - private void prepareBroker() throws Exception - { - prepareQueues(); - prepareNonDurableQueue(); - prepareDurableSubscriptionWithSelector(); - prepareDurableSubscriptionWithoutSelector(); - } - - private void prepareNonDurableQueue() throws Exception - { - Connection connection = _connFac.createConnection(); - AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME); - AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME); - session.sendCreateQueue(queueName, false, false, false, null); - session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination); - MessageProducer messageProducer = session.createProducer(destination); - sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3); - connection.close(); - } - - /** - * Prepare a queue for use in testing message and binding recovery - * after the upgrade is performed. - * - * - Create a transacted session on the connection. - * - Use a consumer to create the (durable by default) queue. - * - Send 5 large messages to test (multi-frame) content recovery. - * - Send 1 small message to test (single-frame) content recovery. - * - Commit the session. - * - Send 5 small messages to test that uncommitted messages are not recovered. - * following the upgrade. - * - Close the session. - */ - private void prepareQueues() throws Exception - { - // Create a connection - Connection connection = _connFac.createConnection(); - connection.start(); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - _logger.error("Error setting exception listener for connection", e); - } - }); - // Create a session on the connection, transacted to confirm delivery - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue(QUEUE_NAME); - // Create a consumer to ensure the queue gets created - // (and enter it into the store, as queues are made durable by default) - MessageConsumer messageConsumer = session.createConsumer(queue); - messageConsumer.close(); - - // Create a Message priorityQueueProducer - MessageProducer messageProducer = session.createProducer(queue); - - // Publish 5 persistent messages, 256k chars to ensure they are multi-frame - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5); - // Publish 5 persistent messages, 1k chars to ensure they are single-frame - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); - - session.commit(); - - // Publish 5 persistent messages which will NOT be committed and so should be 'lost' - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); - messageProducer.close(); - session.close(); - - session = connection.createSession(true, Session.SESSION_TRANSACTED); - // Create a priority queue on broker - final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); - priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10); - Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); - MessageProducer priorityQueueProducer = session.createProducer(priorityQueue); - - for (int msg = 0; msg < 5; msg++) - { - priorityQueueProducer.setPriority(msg % 10); - Message message = session.createTextMessage(generateString(256*1024)); - message.setIntProperty("ID", msg); - priorityQueueProducer.send(message); - } - session.commit(); - priorityQueueProducer.close(); - - // Create a queue that has a DLQ - final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>(); - queueWithDLQArguments.put("x-qpid-dlq-enabled", true); - queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); - createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments); - - // Send message to the DLQ - Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter"); - MessageProducer dlqMessageProducer = session.createProducer(dlq); - sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1); - session.commit(); - - // Create a queue with JMX specifying an owner, so it can later be moved into description - createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments); - - createExchange(TEST_EXCHANGE_NAME, "direct"); - Queue customQueue = createAndBindQueueOnBroker(session, TEST_QUEUE_NAME, null, TEST_EXCHANGE_NAME, "direct"); - MessageProducer customQueueMessageProducer = session.createProducer(customQueue); - sendMessages(session, customQueueMessageProducer, customQueue, DeliveryMode.PERSISTENT, 1*1024, 1); - session.commit(); - customQueueMessageProducer.close(); - - prepareSortedQueue(session, SORTED_QUEUE_NAME, SORT_KEY); - - session.close(); - connection.close(); - } - - private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception - { - return createAndBindQueueOnBroker(session, queueName, arguments, "amq.direct", "direct"); - } - - private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments, String exchangeName, String exchangeType) throws Exception - { - ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); - Queue queue = session.createQueue("BURL:" + exchangeType + "://" + exchangeName + "/" + queueName + "/" + queueName + "?durable='true'"); - ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); - return queue; - } - - private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception - { - JMXConnector jmxConnector = createJMXConnector(); - try - { - MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); - ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\""); - - Object[] params = new Object[] {queueName, owner, true, arguments}; - String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()}; - mbsc.invoke(virtualHost, "createNewQueue", params, signature); - - ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\",name=\"amq.direct\",ExchangeType=direct"); - mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()}); - } - finally - { - jmxConnector.close(); - } - } - - private void createExchange(String exchangeName, String exchangeType) throws Exception - { - JMXConnector jmxConnector = createJMXConnector(); - try - { - MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); - ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\""); - - Object[] params = new Object[]{exchangeName, exchangeType, true}; - String[] signature = new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()}; - mbsc.invoke(virtualHost, "createNewExchange", params, signature); - } - finally - { - jmxConnector.close(); - } - } - - private JMXConnector createJMXConnector() throws Exception - { - Map<String, Object> environment = new HashMap<>(); - environment.put(JMXConnector.CREDENTIALS, new String[] {"admin", "admin"}); - JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi"); - return JMXConnectorFactory.connect(url, environment); - } - - private void prepareSortedQueue(Session session, String queueName, String sortKey) throws Exception - { - final Map<String, Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.queue_sort_key", sortKey); - Queue sortedQueue = createAndBindQueueOnBroker(session, queueName, arguments); - - MessageProducer messageProducer2 = session.createProducer(sortedQueue); - - String[] sortKeys = {"c", "b", "e", "a", "d"}; - for (int i = 1; i <= sortKeys.length; i++) - { - Message message = session.createTextMessage(generateString(256*1024)); - message.setIntProperty("ID", i); - message.setStringProperty(sortKey, sortKeys[i - 1]); - messageProducer2.send(message); - } - session.commit(); - } - - /** - * Prepare a DurableSubscription backing queue for use in testing selector - * recovery and queue exclusivity marking during the upgrade process. - * - * - Create a transacted session on the connection. - * - Open and close a DurableSubscription with selector to create the backing queue. - * - Send a message which matches the selector. - * - Send a message which does not match the selector. - * - Send a message which matches the selector but will remain uncommitted. - * - Close the session. - */ - private void prepareDurableSubscriptionWithSelector() throws Exception - { - - // Create a connection - TopicConnection connection = _connFac.createTopicConnection(); - connection.start(); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - _logger.error("Error setting exception listener for connection", e); - } - }); - // Create a session on the connection, transacted to confirm delivery - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(SELECTOR_TOPIC_NAME); - - // Create and register a durable subscriber with selector and then close it - TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); - durSub1.close(); - - // Create a publisher and send a persistent message which matches the selector - // followed by one that does not match, and another which matches but is not - // committed and so should be 'lost' - TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - TopicPublisher publisher = pubSession.createPublisher(topic); - - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); - pubSession.commit(); - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - - publisher.close(); - pubSession.close(); - connection.close(); - } - - /** - * Prepare a DurableSubscription backing queue for use in testing use of - * DurableSubscriptions without selectors following the upgrade process. - * - * - Create a transacted session on the connection. - * - Open and close a DurableSubscription without selector to create the backing queue. - * - Send a message which matches the subscription and commit session. - * - Close the session. - */ - private void prepareDurableSubscriptionWithoutSelector() throws Exception - { - // Create a connection - TopicConnection connection = _connFac.createTopicConnection(); - connection.start(); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - _logger.error("Error setting exception listener for connection", e); - } - }); - // Create a session on the connection, transacted to confirm delivery - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(TOPIC_NAME); - - // Create and register a durable subscriber without selector and then close it - TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME); - durSub1.close(); - - // Create a publisher and send a persistent message which matches the subscription - TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - TopicPublisher publisher = pubSession.createPublisher(topic); - - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); - pubSession.commit(); - - publisher.close(); - pubSession.close(); - connection.close(); - } - - private static void sendMessages(Session session, MessageProducer messageProducer, - Destination dest, int deliveryMode, int length, int numMesages) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } - - private static void publishMessages(Session session, TopicPublisher publisher, - Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - message.setStringProperty("testprop", selectorProperty); - publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } - - /** - * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. - * - * @param length number of characters in the string - * @return string sequence of the given length - */ - private static String generateString(int length) - { - char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; - char[] chars = new char[length]; - for (int i = 0; i < (length); i++) - { - chars[i] = base_chars[i % 10]; - } - return new String(chars); - } - - /** - * Run the preparation tool. - * @param args Command line arguments. - */ - public static void main(String[] args) throws Exception - { - System.setProperty("qpid.dest_syntax", "BURL"); - BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); - producer.prepareBroker(); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java deleted file mode 100644 index 5de51df992..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; - -import junit.framework.TestCase; - -import org.apache.qpid.test.utils.QpidTestCase; - -public class EnvHomeRegistryTest extends TestCase -{ - - private final EnvHomeRegistry _ehr = new EnvHomeRegistry(); - - public void testDuplicateEnvHomeRejected() throws Exception - { - File home = new File(QpidTestCase.TMP_FOLDER, getName()); - - _ehr.registerHome(home); - try - { - _ehr.registerHome(home); - fail("Exception not thrown"); - } - catch (IllegalArgumentException iae) - { - // PASS - } - } - - public void testUniqueEnvHomesAllowed() throws Exception - { - File home1 = new File(QpidTestCase.TMP_FOLDER, getName() + "1"); - File home2 = new File(QpidTestCase.TMP_FOLDER, getName() + "2"); - - _ehr.registerHome(home1); - _ehr.registerHome(home2); - } - - public void testReuseOfEnvHomesAllowed() throws Exception - { - File home = new File(QpidTestCase.TMP_FOLDER, getName() + "1"); - - _ehr.registerHome(home); - - _ehr.deregisterHome(home); - - _ehr.registerHome(home); - } -}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java deleted file mode 100644 index b185a31c3b..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.server.model.ConfigurationChangeListener; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.State; - -public class NoopConfigurationChangeListener implements ConfigurationChangeListener -{ - - public NoopConfigurationChangeListener() { - } - - @Override - public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) - { - } - - @Override - public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - } - - @Override - public void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - } - - @Override - public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, - Object newAttributeValue) - { - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java deleted file mode 100644 index 3403ec53dc..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.Collections; -import java.util.Map; - -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; - -public class StandardEnvironmentFacadeTest extends QpidTestCase -{ - protected File _storePath; - protected EnvironmentFacade _environmentFacade; - - protected void setUp() throws Exception - { - super.setUp(); - _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); - } - - protected void tearDown() throws Exception - { - try - { - super.tearDown(); - if (_environmentFacade != null) - { - _environmentFacade.close(); - } - } - finally - { - if (_storePath != null) - { - FileUtils.delete(_storePath, true); - } - } - } - - public void testEnvironmentFacade() throws Exception - { - EnvironmentFacade ef = createEnvironmentFacade(); - assertNotNull("Environment should not be null", ef); - Environment e = ef.getEnvironment(); - assertTrue("Environment is not valid", e.isValid()); - } - - public void testSecondEnvironmentFacadeUsingSamePathRejected() throws Exception - { - EnvironmentFacade ef = createEnvironmentFacade(); - assertNotNull("Environment should not be null", ef); - try - { - createEnvironmentFacade(); - fail("Exception not thrown"); - } - catch (IllegalArgumentException iae) - { - // PASS - } - - ef.close(); - - EnvironmentFacade ef2 = createEnvironmentFacade(); - assertNotNull("Environment should not be null", ef2); - } - - public void testClose() throws Exception - { - EnvironmentFacade ef = createEnvironmentFacade(); - ef.close(); - Environment e = ef.getEnvironment(); - - assertNull("Environment should be null after facade close", e); - } - - public void testOverrideJeParameter() throws Exception - { - String statCollectVarName = EnvironmentConfig.STATS_COLLECT; - - EnvironmentFacade ef = createEnvironmentFacade(); - assertEquals("false", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName)); - ef.close(); - - ef = createEnvironmentFacade(Collections.singletonMap(statCollectVarName, "true")); - assertEquals("true", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName)); - ef.close(); - } - - - public void testOpenDatabaseReusesCachedHandle() throws Exception - { - DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); - - EnvironmentFacade ef = createEnvironmentFacade(); - Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertNotNull(handle1); - - Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertSame("Database handle should be cached", handle1, handle2); - - ef.closeDatabase("myDatabase"); - - Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertNotSame("Expecting a new handle after database closure", handle1, handle3); - } - - EnvironmentFacade createEnvironmentFacade() - { - _environmentFacade = createEnvironmentFacade(Collections.<String, String>emptyMap()); - return _environmentFacade; - - } - - EnvironmentFacade createEnvironmentFacade(Map<String, String> map) - { - StandardEnvironmentConfiguration sec = mock(StandardEnvironmentConfiguration.class); - when(sec.getName()).thenReturn(getTestName()); - when(sec.getParameters()).thenReturn(map); - when(sec.getStorePath()).thenReturn(_storePath.getAbsolutePath()); - - return new StandardEnvironmentFacade(sec); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java deleted file mode 100644 index b8c3b493bc..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ /dev/null @@ -1,1057 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.replication; - -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.StringBinding; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.Transaction; -import com.sleepycat.je.rep.NodeState; -import com.sleepycat.je.rep.ReplicaWriteException; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicatedEnvironment.State; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.test.utils.PortHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; -import org.apache.qpid.util.FileUtils; - -public class ReplicatedEnvironmentFacadeTest extends QpidTestCase -{ - private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacadeTest.class); - private static final int LISTENER_TIMEOUT = 5; - private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; - - private final PortHelper _portHelper = new PortHelper(); - - private final String TEST_GROUP_NAME = "testGroupName"; - private final String TEST_NODE_NAME = "testNodeName"; - private final int TEST_NODE_PORT = _portHelper.getNextAvailable(); - private final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; - private final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; - private final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY"); - private final boolean TEST_DESIGNATED_PRIMARY = false; - private final int TEST_PRIORITY = 1; - private final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; - - private File _storePath; - private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); - - public void setUp() throws Exception - { - super.setUp(); - - _storePath = TestFileUtils.createTestDirectory("bdb", true); - - setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); - } - - @Override - public void tearDown() throws Exception - { - try - { - for (EnvironmentFacade ef : _nodes.values()) - { - ef.close(); - } - } - finally - { - try - { - if (_storePath != null) - { - FileUtils.delete(_storePath, true); - } - } - finally - { - super.tearDown(); - } - } - - _portHelper.waitUntilAllocatedPortsAreFree(); - } - public void testEnvironmentFacade() throws Exception - { - EnvironmentFacade ef = createMaster(); - assertNotNull("Environment should not be null", ef); - Environment e = ef.getEnvironment(); - assertTrue("Environment is not valid", e.isValid()); - } - - public void testClose() throws Exception - { - EnvironmentFacade ef = createMaster(); - ef.close(); - Environment e = ef.getEnvironment(); - - assertNull("Environment should be null after facade close", e); - } - - public void testOpenDatabaseReusesCachedHandle() throws Exception - { - DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); - - EnvironmentFacade ef = createMaster(); - Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertNotNull(handle1); - - Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertSame("Database handle should be cached", handle1, handle2); - - ef.closeDatabase("myDatabase"); - - Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); - assertNotSame("Expecting a new handle after database closure", handle1, handle3); - } - - public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception - { - DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); - - EnvironmentFacade ef = createMaster(); - ef.close(); - - try - { - ef.openDatabase("myDatabase", createIfAbsentDbConfig ); - fail("Database open should fail"); - } - catch(ConnectionScopedRuntimeException e) - { - assertEquals("Unexpected exception", "Environment facade is not in opened state", e.getMessage()); - } - } - - public void testGetGroupName() throws Exception - { - assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); - } - - public void testGetNodeName() throws Exception - { - assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); - } - - public void testLastKnownReplicationTransactionId() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId(); - assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0); - } - - public void testGetNodeHostPort() throws Exception - { - assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); - } - - public void testGetHelperHostPort() throws Exception - { - assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); - } - - public void testSetMessageStoreDurability() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - assertEquals("Unexpected message store durability", - new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY), - master.getRealMessageStoreDurability()); - assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability()); - assertTrue("Unexpected coalescing sync", master.isCoalescingSync()); - - master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL); - assertEquals("Unexpected message store durability", - new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL), - master.getRealMessageStoreDurability()); - assertFalse("Coalescing sync committer is still running", master.isCoalescingSync()); - } - - public void testGetNodeState() throws Exception - { - assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); - } - - public void testPriority() throws Exception - { - ReplicatedEnvironmentFacade facade = createMaster(); - assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); - Future<Void> future = facade.setPriority(TEST_PRIORITY + 1); - future.get(5, TimeUnit.SECONDS); - assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); - } - - public void testDesignatedPrimary() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); - future.get(5, TimeUnit.SECONDS); - assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - } - - public void testElectableGroupSizeOverride() throws Exception - { - ReplicatedEnvironmentFacade facade = createMaster(); - assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); - Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); - future.get(5, TimeUnit.SECONDS); - assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); - } - - public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - String nodeName2 = TEST_NODE_NAME + "_2"; - String host = "localhost"; - int port = _portHelper.getNextAvailable(); - String node2NodeHostPort = host + ":" + port; - - final AtomicInteger invocationCount = new AtomicInteger(); - final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - nodeRecoveryLatch.countDown(); - invocationCount.incrementAndGet(); - } - }; - - createReplica(nodeName2, node2NodeHostPort, listener); - - assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers()); - - assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testReplicationGroupListenerHearsNodeAdded() throws Exception - { - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicInteger invocationCount = new AtomicInteger(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - invocationCount.getAndIncrement(); - nodeAddedLatch.countDown(); - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable(); - replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort)); - createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); - - assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); - - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testReplicationGroupListenerHearsNodeRemoved() throws Exception - { - final CountDownLatch nodeDeletedLatch = new CountDownLatch(1); - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicInteger invocationCount = new AtomicInteger(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - nodeAddedLatch.countDown(); - } - - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - nodeAddedLatch.countDown(); - } - - @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) - { - invocationCount.getAndIncrement(); - nodeDeletedLatch.countDown(); - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable(); - replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort)); - createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); - - assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); - - // Need to await the listener hearing the addition of the node to the model. - assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - // Now remove the node and ensure we hear the event - replicatedEnvironmentFacade.removeNodeFromGroup(node2Name); - - assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); - - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testMasterHearsRemoteNodeRoles() throws Exception - { - final String node2Name = TEST_NODE_NAME + "_2"; - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>(); - final CountDownLatch stateLatch = new CountDownLatch(1); - final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - nodeRef.set(node); - nodeAddedLatch.countDown(); - } - - @Override - public void onNodeState(ReplicationNode node, NodeState nodeState) - { - if (node2Name.equals(node.getName())) - { - stateRef.set(nodeState); - stateLatch.countDown(); - } - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable(); - replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort)); - createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); - - assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); - - assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - ReplicationNode remoteNode = (ReplicationNode)nodeRef.get(); - assertEquals("Unexpected node name", node2Name, remoteNode.getName()); - - assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected node state", State.REPLICA, stateRef.get().getNodeState()); - } - - public void testRemoveNodeFromGroup() throws Exception - { - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade environmentFacade = addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost:" + _portHelper.getNextAvailable(); - ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); - - assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers()); - ref2.close(); - - environmentFacade.removeNodeFromGroup(node2Name); - assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers()); - } - - - public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception - { - final String replicaName = TEST_NODE_NAME + "_1"; - final CountDownLatch nodeRemovedLatch = new CountDownLatch(1); - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>(); - final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>(); - final CountDownLatch stateLatch = new CountDownLatch(1); - final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); - - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - if (addedNodeRef.compareAndSet(null, node)) - { - nodeAddedLatch.countDown(); - } - } - - @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) - { - removedNodeRef.set(node); - nodeRemovedLatch.countDown(); - } - - @Override - public void onNodeState(ReplicationNode node, NodeState nodeState) - { - if (replicaName.equals(node.getName())) - { - stateRef.set(nodeState); - stateLatch.countDown(); - } - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - final ReplicatedEnvironmentFacade masterEnvironment = addNode(stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - masterEnvironment.setDesignatedPrimary(true); - - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - masterEnvironment.setPermittedNodes(Arrays.asList(masterEnvironment.getHostPort(), node1NodeHostPort)); - ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener()); - - assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); - - ReplicationNode node = addedNodeRef.get(); - assertEquals("Unexpected node name", replicaName, node.getName()); - - assertTrue("Node state was not heard", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState()); - assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName()); - - replica.close(); - masterEnvironment.removeNodeFromGroup(node.getName()); - - assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected node is deleted", node, removedNodeRef.get()); - } - - public void testCloseStateTransitions() throws Exception - { - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); - - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); - replicatedEnvironmentFacade.close(); - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); - } - - public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception - { - final CountDownLatch masterLatch = new CountDownLatch(1); - final AtomicInteger masterStateChangeCount = new AtomicInteger(); - final CountDownLatch unknownLatch = new CountDownLatch(1); - final AtomicInteger unknownStateChangeCount = new AtomicInteger(); - StateChangeListener stateChangeListener = new StateChangeListener() - { - @Override - public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException - { - if (stateChangeEvent.getState() == State.MASTER) - { - masterStateChangeCount.incrementAndGet(); - masterLatch.countDown(); - } - else if (stateChangeEvent.getState() == State.UNKNOWN) - { - unknownStateChangeCount.incrementAndGet(); - unknownLatch.countDown(); - } - } - }; - - addNode(stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - int replica2Port = _portHelper.getNextAvailable(); - String node2NodeHostPort = "localhost:" + replica2Port; - - ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); - ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener()); - - // close replicas - replica1.close(); - replica2.close(); - - assertTrue("Environment should be recreated and go into unknown state", - unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); - - assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); - assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); - } - - public void testTransferMasterToSelf() throws Exception - { - final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); - final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); - StateChangeListener stateChangeListener = new StateChangeListener(){ - - @Override - public void stateChange(StateChangeEvent event) throws RuntimeException - { - ReplicatedEnvironment.State state = event.getState(); - if (state == ReplicatedEnvironment.State.REPLICA) - { - firstNodeReplicaStateLatch.countDown(); - } - if (state == ReplicatedEnvironment.State.MASTER) - { - firstNodeMasterStateLatch.countDown(); - } - } - }; - ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); - - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); - assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); - - int replica2Port = _portHelper.getNextAvailable(); - String node2NodeHostPort = "localhost:" + replica2Port; - final CountDownLatch replicaStateLatch = new CountDownLatch(1); - final CountDownLatch masterStateLatch = new CountDownLatch(1); - StateChangeListener testStateChangeListener = new StateChangeListener() - { - @Override - public void stateChange(StateChangeEvent event) throws RuntimeException - { - ReplicatedEnvironment.State state = event.getState(); - if (state == ReplicatedEnvironment.State.REPLICA) - { - replicaStateLatch.countDown(); - } - if (state == ReplicatedEnvironment.State.MASTER) - { - masterStateLatch.countDown(); - } - } - }; - ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, - testStateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); - assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); - - thirdNode.transferMasterToSelfAsynchronously(); - assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); - assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); - } - - public void testTransferMasterAnotherNode() throws Exception - { - final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); - final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); - StateChangeListener stateChangeListener = new StateChangeListener(){ - - @Override - public void stateChange(StateChangeEvent event) throws RuntimeException - { - ReplicatedEnvironment.State state = event.getState(); - if (state == ReplicatedEnvironment.State.REPLICA) - { - firstNodeReplicaStateLatch.countDown(); - } - if (state == ReplicatedEnvironment.State.MASTER) - { - firstNodeMasterStateLatch.countDown(); - } - } - }; - ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); - - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); - assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); - - int replica2Port = _portHelper.getNextAvailable(); - String node2NodeHostPort = "localhost:" + replica2Port; - final CountDownLatch replicaStateLatch = new CountDownLatch(1); - final CountDownLatch masterStateLatch = new CountDownLatch(1); - StateChangeListener testStateChangeListener = new StateChangeListener() - { - @Override - public void stateChange(StateChangeEvent event) throws RuntimeException - { - ReplicatedEnvironment.State state = event.getState(); - if (state == ReplicatedEnvironment.State.REPLICA) - { - replicaStateLatch.countDown(); - } - if (state == ReplicatedEnvironment.State.MASTER) - { - masterStateLatch.countDown(); - } - } - }; - String thirdNodeName = TEST_NODE_NAME + "_2"; - ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, - testStateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); - assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); - - firstNode.transferMasterAsynchronously(thirdNodeName); - assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); - assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); - } - - public void testBeginTransaction() throws Exception - { - ReplicatedEnvironmentFacade facade = createMaster(); - Transaction txn = null; - try - { - txn = facade.beginTransaction(); - assertNotNull("Transaction is not created", txn); - txn.commit(); - txn = null; - } - finally - { - if (txn != null) - { - txn.abort(); - } - } - } - - public void testSetPermittedNodes() throws Exception - { - ReplicatedEnvironmentFacade firstNode = createMaster(); - - Set<String> permittedNodes = new HashSet<String>(); - permittedNodes.add("localhost:" + TEST_NODE_PORT); - permittedNodes.add("localhost:" + _portHelper.getNextAvailable()); - firstNode.setPermittedNodes(permittedNodes); - - ReplicatedEnvironmentFacade.ReplicationNodeImpl replicationNode = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT); - NodeState nodeState = ReplicatedEnvironmentFacade.getRemoteNodeState(TEST_GROUP_NAME, replicationNode, 5000); - - ObjectMapper objectMapper = new ObjectMapper(); - - Map<String, Object> settings = objectMapper.readValue(nodeState.getAppState(), Map.class); - Collection<String> appStatePermittedNodes = (Collection<String>)settings.get(ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST); - assertEquals("Unexpected permitted nodes", permittedNodes, new HashSet<String>(appStatePermittedNodes)); - } - - public void testPermittedNodeIsAllowedToConnect() throws Exception - { - ReplicatedEnvironmentFacade firstNode = createMaster(); - - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - - Set<String> permittedNodes = new HashSet<String>(); - permittedNodes.add("localhost:" + TEST_NODE_PORT); - permittedNodes.add(node1NodeHostPort); - firstNode.setPermittedNodes(permittedNodes); - - ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false); - when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME); - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.REPLICA); - ReplicatedEnvironmentFacade secondNode = createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1", - stateChangeListener, new NoopReplicationGroupListener(), configuration); - assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState()); - } - - public void testIntruderNodeIsDetected() throws Exception - { - final CountDownLatch intruderLatch = new CountDownLatch(1); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public boolean onIntruderNode(ReplicationNode node) - { - intruderLatch.countDown(); - return true; - } - }; - ReplicatedEnvironmentFacade firstNode = createMaster(listener); - int replica1Port = _portHelper.getNextAvailable(); - String node1NodeHostPort = "localhost:" + replica1Port; - - Set<String> permittedNodes = new HashSet<String>(); - permittedNodes.add("localhost:" + TEST_NODE_PORT); - - firstNode.setPermittedNodes(permittedNodes); - - String nodeName = TEST_NODE_NAME + "_1"; - createIntruder(nodeName, node1NodeHostPort); - assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS)); - } - - public void testNodeRolledback() throws Exception - { - DatabaseConfig createConfig = new DatabaseConfig(); - createConfig.setAllowCreate(true); - createConfig.setTransactional(true); - - ReplicatedEnvironmentFacade node1 = createMaster(); - - String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable(); - - String replicaName = TEST_NODE_NAME + 1; - ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener()); - - node1.setDesignatedPrimary(true); - - Transaction txn = node1.beginTransaction(); - Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig); - txn.commit(); - - // Put a record (that will be replicated) - putRecord(node1, db, 1, "value1"); - - node2.close(); - - // Put a record (that will be only on node1 as node2 is now offline) - putRecord(node1, db, 2, "value2"); - - db.close(); - - // Stop node1 - node1.close(); - - // Restart the node2, making it primary so it becomes master - TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER); - node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener()); - boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); - assertTrue(replicaName + " did not go into desired state; current actual state is " - + node2StateChangeListener.getCurrentActualState(), awaitForStateChange); - - txn = node2.beginTransaction(); - db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT); - txn.commit(); - - // Do a transaction on node2. The two environments will have diverged - putRecord(node2, db, 3, "diverged"); - - // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin. - TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA); - final CountDownLatch _replicaRolledback = new CountDownLatch(1); - node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener() - { - @Override - public void onNodeRolledback() - { - _replicaRolledback.countDown(); - } - }); - assertTrue("Node 1 did not go into desired state and remained in state " + node1.getNodeState(), - node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - assertTrue("Node 1 did not experience rollback within timeout", - _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - // Finally do one more transaction through the master - putRecord(node2, db, 4, "value4"); - db.close(); - - node1.close(); - node2.close(); - } - - public void testReplicaTransactionBeginsImmediately() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - String nodeName2 = TEST_NODE_NAME + "_2"; - String host = "localhost"; - int port = _portHelper.getNextAvailable(); - String node2NodeHostPort = host + ":" + port; - - final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() ); - - // close the master - master.close(); - - // try to create a transaction in a separate thread - // and make sure that transaction is created immediately. - ExecutorService service = Executors.newSingleThreadExecutor(); - try - { - - Future<Transaction> future = service.submit(new Callable<Transaction>(){ - - @Override - public Transaction call() throws Exception - { - return replica.getEnvironment().beginTransaction(null, null); - } - }); - Transaction transaction = future.get(5, TimeUnit.SECONDS); - assertNotNull("Transaction was not created during expected time", transaction); - transaction.abort(); - } - finally - { - service.shutdown(); - } - } - - public void testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - String nodeName2 = TEST_NODE_NAME + "_2"; - String host = "localhost"; - int port = _portHelper.getNextAvailable(); - String node2NodeHostPort = host + ":" + port; - - final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() ); - - // close the master - master.close(); - - try - { - replica.openDatabase("test", DatabaseConfig.DEFAULT.setAllowCreate(true) ); - fail("Replica write operation should fail"); - } - catch(ReplicaWriteException e) - { - RuntimeException handledException = master.handleDatabaseException("test", e); - assertTrue("Unexpected exception", handledException instanceof ConnectionScopedRuntimeException); - } - } - - private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue, - final String dataValue) - { - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry data = new DatabaseEntry(); - - Transaction txn = master.beginTransaction(); - IntegerBinding.intToEntry(keyValue, key); - StringBinding.stringToEntry(dataValue, data); - - db.put(txn, key, data); - txn.commit(); - } - - - private void createIntruder(String nodeName, String node1NodeHostPort) - { - File environmentPathFile = new File(_storePath, nodeName); - environmentPathFile.mkdirs(); - - ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, nodeName, node1NodeHostPort); - replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT); - - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setDurability(TEST_DURABILITY); - ReplicatedEnvironment intruder = null; - try - { - intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); - } - finally - { - if (intruder != null) - { - intruder.close(); - } - } - } - - private ReplicatedEnvironmentFacade createMaster() throws Exception - { - return createMaster(new NoopReplicationGroupListener()); - } - - private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception - { - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade env = addNode(stateChangeListener, replicationGroupListener); - assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - return env; - } - - private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception - { - TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); - return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener); - } - - private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, - TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener) - throws InterruptedException - { - ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, - testStateChangeListener, replicationGroupListener); - boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); - assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); - return replicaEnvironmentFacade; - } - - private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, - StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) - { - ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); - return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config); - } - - private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) { - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); - ref.setStateChangeListener(stateChangeListener); - ref.setReplicationGroupListener(replicationGroupListener); - ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck()); - _nodes.put(nodeName, ref); - return ref; - } - - private ReplicatedEnvironmentFacade addNode(StateChangeListener stateChangeListener, - ReplicationGroupListener replicationGroupListener) - { - return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, - stateChangeListener, replicationGroupListener); - } - - private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) - { - ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); - when(node.getName()).thenReturn(nodeName); - when(node.getHostPort()).thenReturn(nodeHostPort); - when(node.isDesignatedPrimary()).thenReturn(designatedPrimary); - when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); - when(node.getPriority()).thenReturn(TEST_PRIORITY); - when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); - when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getHelperNodeName()).thenReturn(TEST_NODE_NAME); - - when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(60000); - when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(10000); - when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME), anyInt())).thenReturn(1000); - when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME), anyInt())).thenReturn(3); - when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(10000); - - Map<String, String> repConfig = new HashMap<String, String>(); - repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); - repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - when(node.getReplicationParameters()).thenReturn(repConfig); - when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); - return node; - } - - class NoopReplicationGroupListener implements ReplicationGroupListener - { - - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - } - - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - } - - @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) - { - } - - @Override - public void onNodeState(ReplicationNode node, NodeState nodeState) - { - } - - @Override - public boolean onIntruderNode(ReplicationNode node) - { - LOGGER.warn("Intruder node " + node); - return true; - } - - @Override - public void onNoMajority() - { - } - - @Override - public void onNodeRolledback() - { - } - - @Override - public void onException(Exception e) - { - } - - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java deleted file mode 100644 index 0870191b35..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.replication; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import com.sleepycat.je.rep.ReplicatedEnvironment.State; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; - -class TestStateChangeListener implements StateChangeListener -{ - private final Set<State> _expectedStates; - private final CountDownLatch _latch; - private final AtomicReference<State> _currentActualState = new AtomicReference<State>(); - - public TestStateChangeListener(State expectedState) - { - this(Collections.singleton(expectedState)); - } - - public TestStateChangeListener(Set<State> expectedStates) - { - _expectedStates = new HashSet<State>(expectedStates); - _latch = new CountDownLatch(1); - } - - @Override - public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException - { - _currentActualState.set(stateChangeEvent.getState()); - if (_expectedStates.contains(stateChangeEvent.getState())) - { - _latch.countDown(); - } - } - - public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException - { - return _latch.await(timeout, timeUnit); - } - - public State getCurrentActualState() - { - return _currentActualState.get(); - } -}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java deleted file mode 100644 index 965cad1cb5..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import java.util.Collections; -import java.util.Map; -import junit.framework.TestCase; - -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.ConfiguredObjectRecord; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; - -public class ConfiguredObjectBindingTest extends TestCase -{ - - private ConfiguredObjectRecord _object; - - private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP = - Collections.singletonMap("dummy",(Object) "attributes"); - - private static final String DUMMY_TYPE_STRING = "dummyType"; - private ConfiguredObjectBinding _configuredObjectBinding; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecordImpl(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, - DUMMY_ATTRIBUTES_MAP); - } - - public void testObjectToEntryAndEntryToObject() - { - TupleOutput tupleOutput = new TupleOutput(); - - _configuredObjectBinding.objectToEntry(_object, tupleOutput); - - byte[] entryAsBytes = tupleOutput.getBufferBytes(); - TupleInput tupleInput = new TupleInput(entryAsBytes); - - ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput); - assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes()); - assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType()); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java deleted file mode 100644 index ce143aba1b..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.InputStream; -import java.util.UUID; - -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.subjects.TestBlankSubject; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Database; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.Transaction; - -public abstract class AbstractUpgradeTestCase extends QpidTestCase -{ - protected static final class StaticAnswerHandler implements UpgradeInteractionHandler - { - private UpgradeInteractionResponse _response; - - public StaticAnswerHandler(UpgradeInteractionResponse response) - { - _response = response; - } - - @Override - public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse, - UpgradeInteractionResponse... possibleResponses) - { - return _response; - } - } - - public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", QUEUE_NAME, NON_DURABLE_QUEUE_NAME, - NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, PRIORITY_QUEUE_NAME, QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME + "_DLQ" }; - public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0, 0, 0, 1}; - public static int TOTAL_MESSAGE_NUMBER = 16; - protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); - - // myQueueWithDLQ_DLQ is not bound to the default exchange - protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2 - 1; - protected static final int TOTAL_EXCHANGES = 6; - - private File _storeLocation; - protected Environment _environment; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _storeLocation = copyStore(getStoreDirectoryName()); - - _environment = createEnvironment(_storeLocation); - } - - /** @return eg "bdbstore-v4" - used for copying store */ - protected abstract String getStoreDirectoryName(); - - protected Environment createEnvironment(File storeLocation) - { - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - envConfig.setReadOnly(false); - envConfig.setSharedCache(false); - envConfig.setCacheSize(0); - return new Environment(storeLocation, envConfig); - } - - @Override - public void tearDown() throws Exception - { - try - { - _environment.close(); - } - finally - { - _environment = null; - deleteDirectoryIfExists(_storeLocation); - } - super.tearDown(); - } - - private File copyStore(String storeDirectoryName) throws Exception - { - File storeLocation = new File(new File(TMP_FOLDER), "test-store"); - deleteDirectoryIfExists(storeLocation); - storeLocation.mkdirs(); - int index = 0; - String prefix = "0000000"; - String extension = ".jdb"; - InputStream is = null; - do - { - String fileName = prefix + index + extension; - is = getClass().getClassLoader().getResourceAsStream("upgrade/" + storeDirectoryName + "/test-store/" + fileName); - if (is != null) - { - FileUtils.copy(is, new File(storeLocation, fileName)); - } - index++; - } - while (is != null); - return storeLocation; - } - - protected void deleteDirectoryIfExists(File dir) - { - if (dir.exists()) - { - assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory()); - - boolean deletedSuccessfully = FileUtils.delete(dir, true); - - assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully); - } - } - - protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber) - { - long count = getDatabaseCount(databaseName); - assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count); - } - - protected long getDatabaseCount(String databaseName) - { - DatabaseCallable<Long> operation = new DatabaseCallable<Long>() - { - - @Override - public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction) - { - return new Long(sourceDatabase.count()); - - } - }; - Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation); - return count.longValue(); - } - - public VirtualHost getVirtualHost() - { - VirtualHost virtualHost = mock(VirtualHost.class); - when(virtualHost.getName()).thenReturn(getName()); - when(virtualHost.getId()).thenReturn(UUID.randomUUID()); - return virtualHost; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java deleted file mode 100644 index 7ec442b73d..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import junit.framework.TestCase; - -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.Environment; -import com.sleepycat.je.Transaction; - -public class DatabaseTemplateTest extends TestCase -{ - private static final String SOURCE_DATABASE = "sourceDatabase"; - private Environment _environment; - private Database _sourceDatabase; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _environment = mock(Environment.class); - _sourceDatabase = mock(Database.class); - when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class))) - .thenReturn(_sourceDatabase); - } - - public void testExecuteWithTwoDatabases() - { - String targetDatabaseName = "targetDatabase"; - Database targetDatabase = mock(Database.class); - - Transaction txn = mock(Transaction.class); - - when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class))) - .thenReturn(targetDatabase); - - DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn); - - DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class); - databaseTemplate.run(databaseOperation); - - verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn); - verify(_sourceDatabase).close(); - verify(targetDatabase).close(); - } - - public void testExecuteWithOneDatabases() - { - DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null); - - DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class); - databaseTemplate.run(databaseOperation); - - verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null); - verify(_sourceDatabase).close(); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java deleted file mode 100644 index d0f9455d9a..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord; - -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Transaction; - -public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase -{ - private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName"; - private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName"; - private static final String EXCHANGE_DB_NAME = "exchangeDb_v5"; - private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5"; - private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5"; - private static final String DELIVERY_DB_NAME = "deliveryDb_v5"; - private static final String BINDING_DB_NAME = "queueBindingsDb_v5"; - - @Override - protected String getStoreDirectoryName() - { - return "bdbstore-v4"; - } - - public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception - { - UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); - - assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); - - assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER); - assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER); - assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES); - - for (int i = 0; i < QUEUE_SIZES.length; i++) - { - assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]); - } - - final List<BindingRecord> queueBindings = loadBindings(); - - assertEquals("Unxpected bindings size", TOTAL_BINDINGS, queueBindings.size()); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, ""); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", SELECTOR_TOPIC_NAME, "testprop='true'"); - assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null); - assertBindingRecord(queueBindings, NON_DURABLE_QUEUE_NAME, "amq.direct", NON_DURABLE_QUEUE_NAME, null); - assertBindingRecord(queueBindings, NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, null); - - assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); - - assertContent(); - } - - public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception - { - UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHost()); - HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES)); - assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME)); - - assertQueues(queues); - - assertDatabaseRecordCount(DELIVERY_DB_NAME, 13); - assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 13); - assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES); - - assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1); - assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1); - assertQueueMessages(QUEUE_NAME, 10); - assertQueueMessages(QUEUE_WITH_DLQ_NAME + "_DLQ", 1); - - final List<BindingRecord> queueBindings = loadBindings(); - - assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size()); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, ""); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", - SELECTOR_TOPIC_NAME, "testprop='true'"); - assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null); - - assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); - - assertContent(); - } - - private List<BindingRecord> loadBindings() - { - final BindingTuple bindingTuple = new BindingTuple(); - final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>(); - CursorOperation databaseOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - BindingRecord bindingRecord = bindingTuple.entryToObject(key); - - AMQShortString queueName = bindingRecord.getQueueName(); - AMQShortString exchangeName = bindingRecord.getExchangeName(); - AMQShortString routingKey = bindingRecord.getRoutingKey(); - FieldTable arguments = bindingRecord.getArguments(); - queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments)); - } - }; - new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation); - return queueBindings; - } - - private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName, - String routingKey, String selectorKey) - { - BindingRecord record = null; - for (BindingRecord bindingRecord : queueBindings) - { - if (bindingRecord.getQueueName().asString().equals(queueName) - && bindingRecord.getExchangeName().asString().equals(exchangeName)) - { - record = bindingRecord; - break; - } - } - assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record); - assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString()); - - if (selectorKey != null) - { - assertEquals("Unexpected selector key for " + queueName, selectorKey, - record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue())); - } - } - - private void assertQueueMessages(final String queueName, final int expectedQueueSize) - { - final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize); - - assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue); - - assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue); - } - - private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize) - { - final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding(); - final AtomicInteger deliveryCounter = new AtomicInteger(); - final Set<Long> messagesForQueue = new HashSet<Long>(); - - CursorOperation deliveryDatabaseOperation = new CursorOperation() - { - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key); - String thisQueueName = entryKey.getQueueName().asString(); - if (thisQueueName.equals(queueName)) - { - deliveryCounter.incrementAndGet(); - messagesForQueue.add(entryKey.getMessageId()); - } - } - }; - new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation); - - assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize, - deliveryCounter.get()); - - return messagesForQueue; - } - - private void assertMetadataForQueue(final String queueName, final int expectedQueueSize, - final Set<Long> messageIdsForQueue) - { - final AtomicInteger metadataCounter = new AtomicInteger(); - CursorOperation databaseOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - Long messageId = LongBinding.entryToLong(key); - - boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId); - if (messageIsForTheRightQueue) - { - metadataCounter.incrementAndGet(); - } - } - }; - new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation); - - assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize, - metadataCounter.get()); - } - - private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue) - { - final AtomicInteger contentCounter = new AtomicInteger(); - final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding(); - CursorOperation cursorOperation = new CursorOperation() - { - private long _prevMsgId = -1; - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - MessageContentKey contentKey = keyBinding.entryToObject(key); - long msgId = contentKey.getMessageId(); - - if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId)) - { - contentCounter.incrementAndGet(); - } - - _prevMsgId = msgId; - } - }; - new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation); - - assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize, - contentCounter.get()); - } - - private void assertQueues(Set<String> expectedQueueNames) - { - List<AMQShortString> durableSubNames = Collections.emptyList(); - final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); - final Set<String> actualQueueNames = new HashSet<String>(); - - CursorOperation queueNameCollector = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - QueueRecord record = binding.entryToObject(value); - String queueName = record.getNameShortString().asString(); - actualQueueNames.add(queueName); - } - }; - new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector); - - assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames); - } - - private void assertQueueHasOwner(String queueName, final String expectedOwner) - { - List<AMQShortString> durableSubNames = Collections.emptyList(); - final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); - final AtomicReference<String> actualOwner = new AtomicReference<String>(); - final AtomicBoolean foundQueue = new AtomicBoolean(false); - - CursorOperation queueNameCollector = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - QueueRecord record = binding.entryToObject(value); - String queueName = record.getNameShortString().asString(); - if (queueName.equals(queueName)) - { - foundQueue.set(true); - actualOwner.set(AMQShortString.toString(record.getOwner())); - } - } - }; - new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector); - - assertTrue("Could not find queue in database", foundQueue.get()); - assertEquals("Queue has unexpected owner", expectedOwner, actualOwner.get()); - } - - private void assertContent() - { - final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding(); - CursorOperation contentCursorOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, - DatabaseEntry value) - { - long id = LongBinding.entryToLong(key); - assertTrue("Unexpected id", id > 0); - ByteBuffer content = contentBinding.entryToObject(value); - assertNotNull("Unexpected content", content); - } - }; - new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java deleted file mode 100644 index 6351ee5205..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ /dev/null @@ -1,445 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_METADATA_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_XID_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_CONTENT_DB_NAME; -import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; - -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.store.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecordImpl; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding; -import org.apache.qpid.server.util.MapJsonSerializer; - -public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(UpgradeFrom5To6Test.class); - private static final String ARGUMENTS = "arguments"; - - @Override - protected String getStoreDirectoryName() - { - return "bdbstore-v5"; - } - - public void testPerformUpgrade() throws Exception - { - UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); - - assertDatabaseRecordCounts(); - assertContent(); - - assertConfiguredObjects(); - assertQueueEntries(); - } - - public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception - { - corruptDatabase(); - - UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); - - assertDatabaseRecordCounts(); - - assertConfiguredObjects(); - assertQueueEntries(); - } - - public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception - { - corruptDatabase(); - - UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - - UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); - - upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHost()); - - assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); - assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); - - assertConfiguredObjects(); - assertQueueEntries(); - } - - public void testPerformXidUpgrade() throws Exception - { - File storeLocation = new File(TMP_FOLDER, getName()); - storeLocation.mkdirs(); - Environment environment = createEnvironment(storeLocation); - try - { - populateOldXidEntries(environment); - UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); - assertXidEntries(environment); - } - finally - { - try - { - environment.close(); - } - finally - { - deleteDirectoryIfExists(storeLocation); - } - - } - } - - private void assertXidEntries(Environment environment) - { - final DatabaseEntry value = new DatabaseEntry(); - final DatabaseEntry key = getXidKey(); - new DatabaseTemplate(environment, NEW_XID_DB_NAME, null).run(new DatabaseRunnable() - { - - @Override - public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) - { - xidDatabase.get(null, key, value, LockMode.DEFAULT); - } - }); - NewPreparedTransactionBinding newBinding = new NewPreparedTransactionBinding(); - NewPreparedTransaction newTransaction = newBinding.entryToObject(value); - NewRecordImpl[] newEnqueues = newTransaction.getEnqueues(); - NewRecordImpl[] newDequeues = newTransaction.getDequeues(); - assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); - NewRecordImpl enqueue = newEnqueues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHost().getName()), enqueue.getId()); - assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); - assertEquals("Unxpected new dequeues number", 1, newDequeues.length); - NewRecordImpl dequeue = newDequeues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHost().getName()), dequeue.getId()); - assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); - } - - private void populateOldXidEntries(Environment environment) - { - - final DatabaseEntry value = new DatabaseEntry(); - OldRecordImpl[] enqueues = { new OldRecordImpl("TEST1", 1) }; - OldRecordImpl[] dequeues = { new OldRecordImpl("TEST2", 2) }; - OldPreparedTransaction oldPreparedTransaction = new OldPreparedTransaction(enqueues, dequeues); - OldPreparedTransactionBinding oldPreparedTransactionBinding = new OldPreparedTransactionBinding(); - oldPreparedTransactionBinding.objectToEntry(oldPreparedTransaction, value); - - final DatabaseEntry key = getXidKey(); - new DatabaseTemplate(environment, OLD_XID_DB_NAME, null).run(new DatabaseRunnable() - { - - @Override - public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) - { - xidDatabase.put(null, key, value); - } - }); - } - - protected DatabaseEntry getXidKey() - { - final DatabaseEntry value = new DatabaseEntry(); - byte[] globalId = { 1 }; - byte[] branchId = { 2 }; - Xid xid = new Xid(1l, globalId, branchId); - XidBinding xidBinding = XidBinding.getInstance(); - xidBinding.objectToEntry(xid, value); - return value; - } - - private void assertQueueEntries() - { - final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - final NewQueueEntryBinding newBinding = new NewQueueEntryBinding(); - CursorOperation cursorOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - NewQueueEntryKey newEntryRecord = newBinding.entryToObject(key); - assertTrue("Unexpected queue id", configuredObjects.containsKey(newEntryRecord.getQueueId())); - } - }; - new DatabaseTemplate(_environment, NEW_DELIVERY_DB_NAME, null).run(cursorOperation); - } - - /** - * modify the chunk offset of a message to be wrong, so we can test logic - * that preserves incomplete messages - */ - private void corruptDatabase() - { - CursorOperation cursorOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - CompoundKeyBinding binding = new CompoundKeyBinding(); - CompoundKey originalCompoundKey = binding.entryToObject(key); - int corruptedOffset = originalCompoundKey.getOffset() + 2; - CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset); - DatabaseEntry newKey = new DatabaseEntry(); - binding.objectToEntry(corruptedCompoundKey, newKey); - - _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId() - + ", changed offset from " + originalCompoundKey.getOffset() + " to " - + corruptedCompoundKey.getOffset()); - - deleteCurrent(); - sourceDatabase.put(transaction, newKey, value); - - abort(); - } - }; - - Transaction transaction = _environment.beginTransaction(null, null); - new DatabaseTemplate(_environment, OLD_CONTENT_DB_NAME, transaction).run(cursorOperation); - transaction.commit(); - } - - private void assertDatabaseRecordCounts() - { - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 21); - assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 13); - - assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 13); - assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 13); - } - - private void assertConfiguredObjects() - { - Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 21, configuredObjects.size()); - - Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12); - List<UUID> expectedBindingIDs = new ArrayList<UUID>(); - - expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null)); - expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null)); - expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null)); - - final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>(); - queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10); - queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, "misused-owner-as-description"); - expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments)); - - final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>(); - priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10); - expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments)); - - final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>(); - queueWithDLQArguments.put("x-qpid-dlq-enabled", true); - queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); - expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME, Boolean.FALSE, null, queueWithDLQArguments)); - - final Map<String, Object> dlqArguments = new HashMap<String, Object>(); - dlqArguments.put("x-qpid-dlq-enabled", false); - dlqArguments.put("x-qpid-maximum-delivery-count", 0); - expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME + "_DLQ", Boolean.FALSE, null, dlqArguments)); - expected.add(createExpectedExchangeMap(QUEUE_WITH_DLQ_NAME + "_DLE", "fanout")); - - expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "myUpgradeTopic", "amq.topic", - Collections.singletonMap("x-filter-jms-selector", ""), expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "mySelectorUpgradeTopic", "amq.topic", - Collections.singletonMap("x-filter-jms-selector", "testprop='true'"), expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "clientid:myDurSubName", "<<default>>", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "clientid:mySelectorDurSubName", "<<default>>", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs)); - - expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "<<default>>", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "amq.direct", null, expectedBindingIDs)); - - expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "<<default>>", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "amq.direct", null, expectedBindingIDs)); - expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME + "_DLQ", "dlq", QUEUE_WITH_DLQ_NAME + "_DLE", null, expectedBindingIDs)); - - Set<String> expectedTypes = new HashSet<String>(); - expectedTypes.add(Queue.class.getName()); - expectedTypes.add(Exchange.class.getName()); - expectedTypes.add(Binding.class.getName()); - MapJsonSerializer jsonSerializer = new MapJsonSerializer(); - for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) - { - UpgradeConfiguredObjectRecord object = entry.getValue(); - Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes()); - - assertTrue("Unexpected entry in a store - json [" + object.getAttributes() + "], map [" + deserialized + "]", - expected.remove(deserialized)); - String type = object.getType(); - assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); - UUID key = entry.getKey(); - - assertNotNull("Key cannot be null", key); - - if (type.equals(Exchange.class.getName())) - { - String exchangeName = (String) deserialized.get(Exchange.NAME); - assertNotNull(exchangeName); - assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName())); - } - else if (type.equals(Queue.class.getName())) - { - String queueName = (String) deserialized.get(Queue.NAME); - assertNotNull(queueName); - assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName())); - } - else if (type.equals(Binding.class.getName())) - { - assertTrue("unexpected binding id", expectedBindingIDs.remove(key)); - } - } - - assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty()); - assertTrue("Not all expected bindings found:" + expectedBindingIDs, expectedBindingIDs.isEmpty()); - } - - private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs) - { - Map<String, Object> expectedQueueBinding = new HashMap<String, Object>(); - expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHost().getName()).toString()); - expectedQueueBinding.put(Binding.NAME, bindingName); - expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()).toString()); - if (argumentMap != null) - { - expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap); - } - - expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHost().getName())); - - return expectedQueueBinding; - } - - private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap) - { - Map<String, Object> expectedQueueEntry = new HashMap<String, Object>(); - expectedQueueEntry.put(Queue.NAME, name); - expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag); - expectedQueueEntry.put(Queue.OWNER, owner); - if (argumentMap != null) - { - expectedQueueEntry.put(ARGUMENTS, argumentMap); - } - return expectedQueueEntry; - } - - private Map<String, Object> createExpectedExchangeMap(String name, String type) - { - Map<String, Object> expectedExchnageEntry = new HashMap<String, Object>(); - expectedExchnageEntry.put(Exchange.NAME, name); - expectedExchnageEntry.put(Exchange.TYPE, type); - expectedExchnageEntry.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name()); - return expectedExchnageEntry; - } - - private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() - { - final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); - final ConfiguredObjectBinding binding = new ConfiguredObjectBinding(); - final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); - CursorOperation configuredObjectsCursor = new CursorOperation() - { - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - UUID id = uuidBinding.entryToObject(key); - UpgradeConfiguredObjectRecord object = binding.entryToObject(value); - configuredObjectsRecords.put(id, object); - } - }; - new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor); - return configuredObjectsRecords; - } - - private void assertContent() - { - final NewDataBinding contentBinding = new NewDataBinding(); - CursorOperation contentCursorOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - long id = LongBinding.entryToLong(key); - assertTrue("Unexpected id", id > 0); - byte[] content = contentBinding.entryToObject(value); - assertNotNull("Unexpected content", content); - } - }; - new DatabaseTemplate(_environment, NEW_CONTENT_DB_NAME, null).run(contentCursorOperation); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java deleted file mode 100644 index fc7142e9e4..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.util.MapJsonSerializer; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Transaction; - -public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase -{ - private static final String ARGUMENTS = "arguments"; - - private static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; - private static final String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; - - @Override - public VirtualHost<?,?,?> getVirtualHost() - { - VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class); - when(virtualHost.getName()).thenReturn("test"); - return virtualHost; - } - - @Override - protected String getStoreDirectoryName() - { - return "bdbstore-v7"; - } - - public void testPerformUpgrade() throws Exception - { - UpgradeFrom7To8 upgrade = new UpgradeFrom7To8(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); - - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11); - assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13); - - assertConfiguredObjects(); - assertConfiguredObjectHierarchy(); - } - - - private void assertConfiguredObjectHierarchy() - { - Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy(); - assertEquals("Unexpected number of configured objects", 13, hierarchy.size()); - - UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); - UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName()); - UUID amqDirectUuid = UUIDGenerator.generateExchangeUUID("amq.direct", getVirtualHost().getName()); - UUID queue1Uuid = UUIDGenerator.generateQueueUUID("queue1", getVirtualHost().getName()); - UUID queue1ToAmqDirectBindingUuid = UUIDGenerator.generateBindingUUID("amq.direct", "queue1", "queue1", getVirtualHost().getName()); - - // myexch -> virtualhost - UpgradeHierarchyKey myExchToVhParent = new UpgradeHierarchyKey(myExchUuid, VirtualHost.class.getSimpleName()); - assertExpectedHierarchyEntry(hierarchy, myExchToVhParent, vhUuid); - - // queue1 -> virtualhost - UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName()); - assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid); - - // queue1binding -> amq.direct - // queue1binding -> queue1 - UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName()); - UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName()); - assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid); - assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid); - - String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"}; - for (String exchangeName : defaultExchanges) - { - UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()); - UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName()); - assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid); - } - - } - - private void assertExpectedHierarchyEntry( - Map<UpgradeHierarchyKey, UUID> hierarchy, - UpgradeHierarchyKey childHierarchyKey, UUID parentUUID) - { - assertTrue("Expected hierarchy entry does not exist", hierarchy.containsKey(childHierarchyKey)); - assertEquals("Expected hierarchy entry does not exist", parentUUID, hierarchy.get(childHierarchyKey)); - } - - - private void assertConfiguredObjects() - { - Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 11, configuredObjects.size()); - - Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>(); - - String configVersion = "0.3"; - expected.putAll(createExpectedVirtualHost(configVersion)); - - expected.putAll(createExpectedQueue("queue1", Boolean.FALSE, null, null)); - expected.putAll(createExpectedQueue("queue2", Boolean.FALSE, null, null)); - - expected.putAll(createExpectedExchangeMap("myexch", "direct")); - - expected.putAll(createExpectedBindingMap("queue1", "queue1", "amq.direct", null)); - expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null)); - expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null)); - - expected.putAll(createExpectedExchangeMap("amq.direct", "direct")); - expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout")); - expected.putAll(createExpectedExchangeMap("amq.match", "headers")); - expected.putAll(createExpectedExchangeMap("amq.topic", "topic")); - - MapJsonSerializer jsonSerializer = new MapJsonSerializer(); - for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) - { - UpgradeConfiguredObjectRecord object = entry.getValue(); - - UUID actualKey = entry.getKey(); - String actualType = object.getType(); - String actualJson = object.getAttributes(); - Map<String, Object> actualDeserializedAttributes = jsonSerializer.deserialize(actualJson); - - assertTrue("Entry UUID " + actualKey + " of type " + actualType + " is unexpected", expected.containsKey(actualKey)); - - Map<String, Object> expectedDeserializedAttributes = expected.get(actualKey); - - assertEquals("Entry UUID " + actualKey + " of type " + actualType + " has uenxpected deserialised value, json was: " + actualJson, - expectedDeserializedAttributes, actualDeserializedAttributes); - } - } - - private Map<UUID, Map<String, Object>> createExpectedVirtualHost(String modelVersion) - { - Map<String, Object> expectedVirtualHostEntry = new HashMap<String, Object>(); - expectedVirtualHostEntry.put("modelVersion", modelVersion); - expectedVirtualHostEntry.put(VirtualHost.NAME, getVirtualHost().getName()); - - UUID expectedUUID = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); - return Collections.singletonMap(expectedUUID, expectedVirtualHostEntry); - } - - private Map<UUID, Map<String, Object>> createExpectedQueue(String queueName, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap) - { - Map<String, Object> expectedQueueEntry = new HashMap<String, Object>(); - expectedQueueEntry.put(Queue.NAME, queueName); - expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag); - expectedQueueEntry.put(Queue.OWNER, owner); - expectedQueueEntry.put(Queue.TYPE, "standard"); - - if (argumentMap != null) - { - expectedQueueEntry.put(ARGUMENTS, argumentMap); - } - UUID expectedUUID = UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()); - return Collections.singletonMap(expectedUUID, expectedQueueEntry); - } - - private Map<UUID, Map<String, Object>> createExpectedExchangeMap(String exchangeName, String type) - { - Map<String, Object> expectedExchangeMap = new HashMap<String, Object>(); - expectedExchangeMap.put(Exchange.NAME, exchangeName); - expectedExchangeMap.put(Exchange.TYPE, type); - expectedExchangeMap.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name()); - UUID expectedUUID = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()); - return Collections.singletonMap(expectedUUID, expectedExchangeMap); - } - - private Map<UUID, Map<String, Object>> createExpectedBindingMap(String queueName, String bindingName, String exchangeName, Map<String, String> argumentMap) - { - Map<String, Object> expectedBinding = new HashMap<String, Object>(); - expectedBinding.put(Binding.NAME, bindingName); - expectedBinding.put(Binding.ARGUMENTS, argumentMap == null ? Collections.emptyMap() : argumentMap); - - UUID expectedUUID = UUIDGenerator.generateBindingUUID(exchangeName, queueName, bindingName, getVirtualHost().getName()); - return Collections.singletonMap(expectedUUID, expectedBinding); - } - - private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() - { - final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); - final UpgradeConfiguredObjectBinding binding = new UpgradeConfiguredObjectBinding(); - final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); - CursorOperation configuredObjectsCursor = new CursorOperation() - { - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - UUID id = uuidBinding.entryToObject(key); - UpgradeConfiguredObjectRecord object = binding.entryToObject(value); - configuredObjectsRecords.put(id, object); - } - }; - new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor); - return configuredObjectsRecords; - } - - - private Map<UpgradeHierarchyKey, UUID> loadConfiguredObjectHierarchy() - { - final Map<UpgradeHierarchyKey, UUID> hierarchyRecords = new HashMap<UpgradeHierarchyKey, UUID>(); - final UpgradeHierarchyKeyBinding hierarchyKeyBinding = new UpgradeHierarchyKeyBinding(); - final UpgradeUUIDBinding uuidParentBinding = new UpgradeUUIDBinding(); - CursorOperation hierarchyCursor = new CursorOperation() - { - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, - DatabaseEntry key, DatabaseEntry value) - { - UpgradeHierarchyKey hierarchyKey = hierarchyKeyBinding.entryToObject(key); - UUID parentId = uuidParentBinding.entryToObject(value); - hierarchyRecords.put(hierarchyKey, parentId); - } - }; - new DatabaseTemplate(_environment, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, null).run(hierarchyCursor); - return hierarchyRecords; - } - - private static class UpgradeConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord> - { - @Override - public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput) - { - String type = tupleInput.readString(); - String json = tupleInput.readString(); - UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json); - return configuredObject; - } - - @Override - public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput) - { - throw new UnsupportedOperationException(); - } - } - - private static class UpgradeConfiguredObjectRecord - { - private final String _attributes; - private final String _type; - - public UpgradeConfiguredObjectRecord(String type, String attributes) - { - super(); - _attributes = attributes; - _type = type; - } - - public String getAttributes() - { - return _attributes; - } - - public String getType() - { - return _type; - } - - } - - private static class UpgradeUUIDBinding extends TupleBinding<UUID> - { - @Override - public UUID entryToObject(final TupleInput tupleInput) - { - return new UUID(tupleInput.readLong(), tupleInput.readLong()); - } - - @Override - public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) - { - throw new UnsupportedOperationException(); - } - } - - private static class UpgradeHierarchyKeyBinding extends TupleBinding<UpgradeHierarchyKey> - { - @Override - public UpgradeHierarchyKey entryToObject(TupleInput tupleInput) - { - UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong()); - String parentType = tupleInput.readString(); - - return new UpgradeHierarchyKey(childId, parentType); - } - - @Override - public void objectToEntry(UpgradeHierarchyKey hk, TupleOutput tupleOutput) - { - throw new UnsupportedOperationException(); - } - } - - private static class UpgradeHierarchyKey - { - private final UUID _childId; - private final String _parentType; - - public UpgradeHierarchyKey(final UUID childId, final String parentType) - { - _childId = childId; - _parentType = parentType; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - final UpgradeHierarchyKey that = (UpgradeHierarchyKey) o; - - if (!_childId.equals(that._childId)) - { - return false; - } - if (!_parentType.equals(that._parentType)) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _childId.hashCode(); - result = 31 * result + _parentType.hashCode(); - return result; - } - - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java deleted file mode 100644 index 5b869b67dc..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.OperationStatus; - -import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; -import org.apache.qpid.server.util.ServerScopedRuntimeException; - -public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase -{ - private Upgrader _upgrader; - - @Override - protected String getStoreDirectoryName() - { - return "bdbstore-v999"; - } - - @Override - public void setUp() throws Exception - { - super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHost()); - } - - private int getStoreVersion() - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - int storeVersion = -1; - Database versionDb = null; - Cursor cursor = null; - try - { - versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig); - cursor = versionDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) - { - int version = IntegerBinding.entryToInt(key); - if (storeVersion < version) - { - storeVersion = version; - } - } - } - finally - { - if (cursor != null) - { - cursor.close(); - } - if (versionDb != null) - { - versionDb.close(); - } - } - return storeVersion; - } - - public void testUpgrade() throws Exception - { - assertEquals("Unexpected store version", 999, getStoreVersion()); - try - { - _upgrader.upgradeIfNecessary(); - fail("Store should not be able to be upgraded"); - } - catch(ServerScopedRuntimeException ex) - { - assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " - + BDBConfigurationStore.VERSION, ex.getMessage()); - } - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java deleted file mode 100644 index 54dd08fd98..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.upgrade; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; -import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; - -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Transaction; - -public class UpgraderTest extends AbstractUpgradeTestCase -{ - private Upgrader _upgrader; - - @Override - protected String getStoreDirectoryName() - { - return "bdbstore-v4"; - } - - @Override - public void setUp() throws Exception - { - super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHost()); - } - - private int getStoreVersion(Environment environment) - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - int storeVersion = -1; - Database versionDb = null; - Cursor cursor = null; - try - { - versionDb = environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig); - cursor = versionDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) - { - int version = IntegerBinding.entryToInt(key); - if (storeVersion < version) - { - storeVersion = version; - } - } - } - finally - { - if (cursor != null) - { - cursor.close(); - } - if (versionDb != null) - { - versionDb.close(); - } - } - return storeVersion; - } - - public void testUpgrade() throws Exception - { - assertEquals("Unexpected store version", -1, getStoreVersion(_environment)); - _upgrader.upgradeIfNecessary(); - assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(_environment)); - assertContent(); - } - - public void testEmptyDatabaseUpgradeDoesNothing() throws Exception - { - File nonExistentStoreLocation = new File(TMP_FOLDER, getName()); - deleteDirectoryIfExists(nonExistentStoreLocation); - - nonExistentStoreLocation.mkdir(); - Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation); - try - { - _upgrader = new Upgrader(emptyEnvironment, getVirtualHost()); - _upgrader.upgradeIfNecessary(); - - List<String> databaseNames = emptyEnvironment.getDatabaseNames(); - List<String> expectedDatabases = new ArrayList<String>(); - expectedDatabases.add(Upgrader.VERSION_DB_NAME); - assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); - assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(emptyEnvironment)); - - } - finally - { - emptyEnvironment.close(); - nonExistentStoreLocation.delete(); - } - } - - private void assertContent() - { - final ContentBinding contentBinding = ContentBinding.getInstance(); - CursorOperation contentCursorOperation = new CursorOperation() - { - - @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, - DatabaseEntry value) - { - long id = LongBinding.entryToLong(key); - assertTrue("Unexpected id", id > 0); - byte[] content = contentBinding.entryToObject(value); - assertNotNull("Unexpected content", content); - } - }; - new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java deleted file mode 100644 index 24a2ddb071..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; -import org.apache.qpid.util.FileUtils; - -public class BDBVirtualHostImplTest extends QpidTestCase -{ - private File _storePath; - private VirtualHostNode<?> _node; - - @Override - public void setUp() throws Exception - { - super.setUp(); - Broker broker = BrokerTestHelper.createBrokerMock(); - - TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance(); - when(broker.getTaskExecutor()).thenReturn(taskExecutor); - when(broker.getChildExecutor()).thenReturn(taskExecutor); - - - _storePath = TestFileUtils.createTestDirectory(); - - _node = mock(VirtualHostNode.class); - when(_node.getParent(Broker.class)).thenReturn(broker); - when(_node.getModel()).thenReturn(BrokerModel.getInstance()); - when(_node.getTaskExecutor()).thenReturn(taskExecutor); - when(_node.getChildExecutor()).thenReturn(taskExecutor); - when(_node.getConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class)); - when(_node.getId()).thenReturn(UUID.randomUUID()); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_storePath != null) - { - FileUtils.delete(_storePath, true); - } - } - finally - { - super.tearDown(); - } - } - - public void testValidateOnCreateForInvalidStorePath() throws Exception - { - String hostName = getTestName(); - File file = new File(_storePath + File.separator + hostName); - assertTrue("Empty file is not created", file.createNewFile()); - Map<String, Object> attributes = new HashMap<>(); - attributes.put(BDBVirtualHost.ID, UUID.randomUUID()); - attributes.put(BDBVirtualHost.TYPE, BDBVirtualHostImpl.VIRTUAL_HOST_TYPE); - attributes.put(BDBVirtualHost.NAME, hostName); - attributes.put(BDBVirtualHost.STORE_PATH, file.getAbsoluteFile()); - - BDBVirtualHostImpl host = new BDBVirtualHostImpl(attributes, _node); - try - { - host.create(); - fail("Cannot create DBD virtual host from existing empty file"); - } - catch (IllegalConfigurationException e) - { - assertTrue("Unexpected exception " + e.getMessage(), e.getMessage().startsWith("Cannot open virtual host message store")); - } - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java deleted file mode 100644 index 4fe2bdc97f..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.server.virtualhostnode.berkeleydb; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.security.AccessControlException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class BDBHARemoteReplicationNodeTest extends QpidTestCase -{ - private final org.apache.qpid.server.security.SecurityManager _mockSecurityManager = mock(SecurityManager.class); - - private Broker _broker; - private TaskExecutor _taskExecutor; - private BDBHAVirtualHostNode<?> _virtualHostNode; - private DurableConfigurationStore _configStore; - private ReplicatedEnvironmentFacade _facade; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - _facade = mock(ReplicatedEnvironmentFacade.class); - - _broker = BrokerTestHelper.createBrokerMock(); - - _taskExecutor = new CurrentThreadTaskExecutor(); - _taskExecutor.start(); - when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); - when(_broker.getChildExecutor()).thenReturn(_taskExecutor); - - _virtualHostNode = mock(BDBHAVirtualHostNode.class); - _configStore = mock(DurableConfigurationStore.class); - when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore); - - // Virtualhost needs the EventLogger from the SystemContext. - when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker); - doReturn(VirtualHostNode.class).when(_virtualHostNode).getCategoryClass(); - ConfiguredObjectFactory objectFactory = _broker.getObjectFactory(); - when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel()); - when(_virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor); - when(_virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor); - - } - - public void testUpdateRole() - { - String remoteReplicationName = getName(); - BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName); - - remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, remoteReplicationNode.getRole(), NodeRole.MASTER); - - verify(_facade).transferMasterAsynchronously(remoteReplicationName); - } - - public void testDelete() - { - String remoteReplicationName = getName(); - BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName); - - remoteReplicationNode.delete(); - - verify(_facade).removeNodeFromGroup(remoteReplicationName); - } - - // *************** ReplicationNode Access Control Tests *************** - - public void testUpdateDeniedByACL() - { - when(_broker.getSecurityManager()).thenReturn(_mockSecurityManager); - - String remoteReplicationName = getName(); - BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName); - - doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(remoteReplicationNode); - - assertNull(remoteReplicationNode.getDescription()); - - try - { - remoteReplicationNode.setAttribute(VirtualHost.DESCRIPTION, null, "My description"); - fail("Exception not thrown"); - } - catch (AccessControlException ace) - { - // PASS - } - } - - public void testDeleteDeniedByACL() - { - when(_broker.getSecurityManager()).thenReturn(_mockSecurityManager); - - String remoteReplicationName = getName(); - BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName); - - doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(remoteReplicationNode); - - assertNull(remoteReplicationNode.getDescription()); - - try - { - remoteReplicationNode.delete(); - fail("Exception not thrown"); - } - catch (AccessControlException ace) - { - // PASS - } - } - - private BDBHARemoteReplicationNode createRemoteReplicationNode(final String replicationNodeName) - { - Map<String, Object> attributes = new HashMap<>(); - attributes.put(BDBHARemoteReplicationNode.NAME, replicationNodeName); - attributes.put(BDBHARemoteReplicationNode.MONITOR, Boolean.FALSE); - - BDBHARemoteReplicationNodeImpl node = new BDBHARemoteReplicationNodeImpl(_virtualHostNode, attributes, _facade); - node.create(); - return node; - } - - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java deleted file mode 100644 index 6d740568ab..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java +++ /dev/null @@ -1,444 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhostnode.berkeleydb; - -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.*; - -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.SystemConfig; -import org.apache.qpid.server.store.berkeleydb.NoopConfigurationChangeListener; -import org.apache.qpid.test.utils.PortHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.hamcrest.Description; -import org.mockito.ArgumentMatcher; - -/** - * Class to test that specific VHN operations result in the expected Operational Log message(s) being performed. - */ -public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase -{ - private BDBHAVirtualHostNodeTestHelper _helper; - private EventLogger _eventLogger; - private PortHelper _portHelper = new PortHelper(); - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _helper = new BDBHAVirtualHostNodeTestHelper(getTestName()); - _eventLogger = mock(EventLogger.class); - SystemConfig<?> context = (SystemConfig<?>) _helper.getBroker().getParent(SystemConfig.class); - when(context.getEventLogger()).thenReturn(_eventLogger); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _helper.tearDown(); - } - finally - { - super.tearDown(); - } - - _portHelper.waitUntilAllocatedPortsAreFree(); - } - - public void testCreate() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - - _helper.assertNodeRole(node1, NodeRole.MASTER); - - // stop node to avoid running into race when role change is reported after we performed the check - node1.stop(); - - assertEquals("Unexpected VHN log subject", "[grp(/group)/vhn(/node1)] ", node1.getVirtualHostNodeLogSubject().getLogString()); - assertEquals("Unexpected group log subject", "[grp(/group)] ", node1.getGroupLogSubject().getLogString()); - - String expectedMessage = HighAvailabilityMessages.CREATED().toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.CREATED_LOG_HIERARCHY))); - - expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getAddress(), NodeRole.WAITING.name(), NodeRole.MASTER.name()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ROLE_CHANGED_LOG_HIERARCHY))); - } - - public void testDelete() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - reset(_eventLogger); - - node1.delete(); - - String expectedMessage = HighAvailabilityMessages.DELETED().toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY))); - - } - - public void testSetPriority() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - reset(_eventLogger); - - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 10)); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED("10").toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.PRIORITY_CHANGED_LOG_HIERARCHY))); - } - - public void testSetQuorumOverride() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - reset(_eventLogger); - - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED("1").toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY))); - } - - public void testSetDesignatedPrimary() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - reset(_eventLogger); - - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true)); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED("true").toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY))); - } - - public void testRemoteNodeAdded() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - reset(_eventLogger); - - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes); - _helper.awaitRemoteNodes(node1, 1); - - // make sure that task executor thread finishes all scheduled tasks - node2.stop(); - - // Verify ADDED message from node1 when it discovers node2 has been added - String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), node2.getAddress()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY))); - } - - public void testRemoteNodeRemoved() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - resetEventLogger(); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes); - _helper.awaitRemoteNodes(node1, 1); - - reset(_eventLogger); - - node2.delete(); - _helper.awaitRemoteNodes(node1, 0); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - String expectedMessage = HighAvailabilityMessages.REMOVED(node2.getName(), node2.getAddress()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.REMOVED_LOG_HIERARCHY))); - } - - public void testRemoteNodeDetached() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - - final CountDownLatch remoteNodeAdded = new CountDownLatch(1); - node1.addChangeListener(new NoopConfigurationChangeListener() - { - @Override - public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - if (child instanceof BDBHARemoteReplicationNode) - { - remoteNodeAdded.countDown(); - } - } - }); - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes); - - assertTrue("Remote node was not added during expected period of time", remoteNodeAdded.await(10, TimeUnit.SECONDS)); - - - BDBHARemoteReplicationNodeImpl remoteNode = (BDBHARemoteReplicationNodeImpl)node1.getRemoteReplicationNodes().iterator().next(); - waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA)); - - - reset(_eventLogger); - - // close remote node - node2.close(); - - - waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.UNREACHABLE)); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - // verify that remaining node issues the DETACHED operational logging for remote node - String expectedMessage = HighAvailabilityMessages.LEFT(node2.getName(), node2.getAddress()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.LEFT_LOG_HIERARCHY))); - } - - - public void testRemoteNodeReAttached() throws Exception - { - int node1PortNumber = _portHelper.getNextAvailable(); - int node2PortNumber = _portHelper.getNextAvailable(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber); - node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, NodeRole.MASTER); - - resetEventLogger(); - - Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName); - BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes); - _helper.awaitRemoteNodes(node1, 1); - - BDBHARemoteReplicationNodeImpl remoteNode = (BDBHARemoteReplicationNodeImpl)node1.getRemoteReplicationNodes().iterator().next(); - waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA)); - - node2.close(); - - waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.UNREACHABLE)); - - reset(_eventLogger); - - node2Attributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, - node1Attributes.get(BDBHAVirtualHostNode.PERMITTED_NODES)); - node2 = (BDBHAVirtualHostNodeImpl)_helper.recoverHaVHN(node2.getId(), node2Attributes); - _helper.assertNodeRole(node2, NodeRole.REPLICA, NodeRole.MASTER); - waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA, NodeRole.MASTER)); - - // make sure that task executor thread finishes all scheduled tasks - node1.stop(); - - final String expectedMessage = HighAvailabilityMessages.JOINED(node2.getName(), node2.getAddress()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.JOINED_LOG_HIERARCHY))); - } - - private void waitForRemoteNodeToAttainRole(BDBHARemoteReplicationNode remoteNode, EnumSet<NodeRole> desiredRoles) throws Exception - { - int counter = 0; - while (!desiredRoles.contains(remoteNode.getRole()) && counter<50) - { - Thread.sleep(100); - counter++; - } - } - - private EventLogger resetEventLogger() - { - EventLogger eventLogger = mock(EventLogger.class); - SystemConfig<?> context = (SystemConfig<?>) _helper.getBroker().getParent(SystemConfig.class); - when(context.getEventLogger()).thenReturn(eventLogger); - return eventLogger; - } - - class LogMessageMatcher extends ArgumentMatcher<LogMessage> - { - private String _expectedMessage; - private String _expectedMessageFailureDescription = null; - private String _expectedHierarchy; - private String _expectedHierarchyFailureDescription = null; - - public LogMessageMatcher(String expectedMessage, String expectedHierarchy) - { - _expectedMessage = expectedMessage; - _expectedHierarchy = expectedHierarchy; - } - - @Override - public boolean matches(Object argument) - { - LogMessage logMessage = (LogMessage)argument; - - boolean expectedMessageMatches = _expectedMessage.equals(logMessage.toString()); - if (!expectedMessageMatches) - { - _expectedMessageFailureDescription = "Expected message does not match. Expected: " + _expectedMessage + ", actual: " + logMessage.toString(); - } - boolean expectedHierarchyMatches = _expectedHierarchy.equals(logMessage.getLogHierarchy()); - if (!expectedHierarchyMatches) - { - _expectedHierarchyFailureDescription = "Expected hierarchy does not match. Expected: " + _expectedHierarchy + ", actual: " + logMessage.getLogHierarchy(); - } - - return expectedMessageMatches && expectedHierarchyMatches; - } - - @Override - public void describeTo(Description description) - { - if (_expectedMessageFailureDescription != null) - { - description.appendText(_expectedMessageFailureDescription); - } - if (_expectedHierarchyFailureDescription != null) - { - description.appendText(_expectedHierarchyFailureDescription); - } - } - } - - class LogSubjectMatcher extends ArgumentMatcher<LogSubject> - { - private LogSubject _logSubject; - private String _failureDescription = null; - - public LogSubjectMatcher(LogSubject logSubject) - { - _logSubject = logSubject; - } - - @Override - public boolean matches(Object argument) - { - final LogSubject logSubject = (LogSubject)argument; - final boolean foundAMatch = _logSubject.toLogString().equals(logSubject.toLogString()); - if (!foundAMatch) - { - _failureDescription = "LogSubject does not match. Expected: " + _logSubject.toLogString() + ", actual : " + logSubject.toLogString(); - } - return foundAMatch; - } - - @Override - public void describeTo(Description description) - { - if (_failureDescription != null) - { - description.appendText(_failureDescription); - } - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java deleted file mode 100644 index 9d0e905fc5..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhostnode.berkeleydb; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import com.sleepycat.je.rep.ReplicationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializationConfig; - -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.RemoteReplicationNode; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; -import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -/** - * Helper class to make the tests of BDB HA Virtual Host Nodes simpler and more concise. - */ -public class BDBHAVirtualHostNodeTestHelper -{ - private final String _testName; - private Broker<?> _broker; - private File _bdbStorePath; - private TaskExecutor _taskExecutor; - private final ConfiguredObjectFactory _objectFactory = BrokerModel.getInstance().getObjectFactory(); - private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>(); - - public BDBHAVirtualHostNodeTestHelper(String testName) throws Exception - { - _testName = testName; - _broker = BrokerTestHelper.createBrokerMock(); - - _taskExecutor = new TaskExecutorImpl(); - _taskExecutor.start(); - when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); - when(_broker.getChildExecutor()).thenReturn(_taskExecutor); - - - _bdbStorePath = new File(QpidTestCase.TMP_FOLDER, _testName + "." + System.currentTimeMillis()); - _bdbStorePath.deleteOnExit(); - } - - public void tearDown() throws Exception - { - try - { - Exception firstException = null; - for (VirtualHostNode<?> node : _nodes) - { - try - { - node.delete(); - } - catch(Exception e) - { - if (firstException != null) - { - firstException = e; - } - } - } - if (firstException != null) - { - throw firstException; - } - } - finally - { - if (_taskExecutor != null) - { - _taskExecutor.stopImmediately(); - } - if (_bdbStorePath != null) - { - FileUtils.delete(_bdbStorePath, true); - } - } - } - - public BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name) - { - for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes()) - { - if (remoteNode.getName().equals(name)) - { - return (BDBHARemoteReplicationNode<?>)remoteNode; - } - } - return null; - } - - public void awaitRemoteNodes(BDBHAVirtualHostNode<?> node, int expectedNodeNumber) throws InterruptedException - { - int counter = 0; - - @SuppressWarnings("rawtypes") - Collection<? extends RemoteReplicationNode> remoteNodes = null; - do - { - remoteNodes = node.getRemoteReplicationNodes(); - if (counter > 0) - { - Thread.sleep(100); - } - counter++; - } - // TODO: 30 seconds is quite a lot to wait, we need to reduce this limit - while(remoteNodes.size() != expectedNodeNumber && counter<100); - assertEquals("Unexpected node number", expectedNodeNumber, node.getRemoteReplicationNodes().size()); - } - - public void awaitForAttributeChange(ConfiguredObject<?> object, String name, Object expectedValue) throws InterruptedException - { - int awaitCounter = 0; - while(!object.equals(object.getAttribute(name)) && awaitCounter < 50) - { - Thread.sleep(100); - awaitCounter++; - } - assertEquals("Unexpected attribute " + name + " on " + object, expectedValue, object.getAttribute(name) ); - } - - public BDBHAVirtualHostNode<?> awaitAndFindNodeInRole(NodeRole desiredRole) throws InterruptedException - { - BDBHAVirtualHostNode<?> replica = null; - int findReplicaCount = 0; - while(replica == null) - { - replica = findNodeInRole(desiredRole); - if (replica == null) - { - Thread.sleep(100); - } - if (findReplicaCount > 50) - { - fail("Could not find a node in role " + desiredRole); - } - findReplicaCount++; - } - return replica; - } - - public BDBHAVirtualHostNode<?> findNodeInRole(NodeRole role) - { - for (BDBHAVirtualHostNode<?> node : _nodes) - { - if (role == node.getRole()) - { - return node; - } - } - return null; - } - - public BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes) - { - @SuppressWarnings("unchecked") - BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) _objectFactory.create(VirtualHostNode.class, attributes, _broker); - _nodes.add(node); - return node; - } - - public BDBHAVirtualHostNode<?> recoverHaVHN(UUID id, Map<String, Object> attributes) - { - Map<String,UUID> parents = new HashMap<>(); - parents.put(Broker.class.getSimpleName(),_broker.getId()); - ConfiguredObjectRecordImpl record = new ConfiguredObjectRecordImpl(id, VirtualHostNode.class.getSimpleName(), attributes, parents ); - - @SuppressWarnings("unchecked") - UnresolvedConfiguredObject<BDBHAVirtualHostNodeImpl> unresolved = _objectFactory.recover(record, _broker); - BDBHAVirtualHostNode<?> node = unresolved.resolve(); - node.open(); - _nodes.add(node); - return node; - } - - public void assertNodeRole(BDBHAVirtualHostNode<?> node, NodeRole... roleName) throws InterruptedException - { - int iterationCounter = 0; - boolean inRole =false; - do - { - for (NodeRole role : roleName) - { - if (role == node.getRole()) - { - inRole = true; - break; - } - } - if (!inRole) - { - Thread.sleep(50); - } - iterationCounter++; - } - while(!inRole && iterationCounter<100); - assertTrue("Node " + node.getName() + " did not transit into role " + Arrays.toString(roleName) - + " Node role is " + node.getRole(), inRole); - } - - public BDBHAVirtualHostNode<?> createAndStartHaVHN(Map<String, Object> attributes) throws InterruptedException - { - BDBHAVirtualHostNode<?> node = createHaVHN(attributes); - return startNodeAndWait(node); - } - - public BDBHAVirtualHostNode<?> startNodeAndWait(BDBHAVirtualHostNode<?> node) throws InterruptedException - { - node.start(); - assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA); - assertEquals("Unexpected node state", State.ACTIVE, node.getState()); - return node; - } - - public String getMessageStorePath() - { - return _bdbStorePath.getAbsolutePath(); - } - - public Broker getBroker() - { - return _broker; - } - - public Map<String, Object> createNodeAttributes(String nodeName, String groupName, String address, - String helperAddress, String helperNodeNode, int... ports) - throws Exception - { - Map<String, Object> node1Attributes = new HashMap<String, Object>(); - node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); - node1Attributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); - node1Attributes.put(BDBHAVirtualHostNode.NAME, nodeName); - node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); - node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, address); - node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); - node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, getMessageStorePath() + File.separator + nodeName); - if (address.equals(helperAddress)) - { - node1Attributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, getPermittedNodes(ports)); - } - else - { - node1Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperNodeNode); - } - - Map<String, String> context = new HashMap<String, String>(); - context.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); - context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - - if (ports != null) - { - String bluePrint = getBlueprint(); - node1Attributes.put(AbstractVirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, bluePrint); - } - - node1Attributes.put(BDBHAVirtualHostNode.CONTEXT, context); - - return node1Attributes; - } - - public static String getBlueprint() throws Exception - { - Map<String,Object> bluePrint = new HashMap<>(); - bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); - - StringWriter writer = new StringWriter(); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); - mapper.writeValue(writer, bluePrint); - return writer.toString(); - } - - public static List<String> getPermittedNodes(int[] ports) - { - List<String> permittedNodes = new ArrayList<String>(); - for (int port:ports) - { - permittedNodes.add("localhost:" + port); - } - return permittedNodes; - } - - public void awaitForVirtualhost(final VirtualHostNode<?> node, final int wait) - { - long endTime = System.currentTimeMillis() + wait; - do - { - if(node.getVirtualHost() != null) - { - return; - } - try - { - Thread.sleep(100); - } - catch (InterruptedException e) - { - // ignore - } - } - while(System.currentTimeMillis() < endTime); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java deleted file mode 100644 index 812d9a3b19..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhostnode.berkeleydb; - -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; -import org.apache.qpid.util.FileUtils; - -public class BDBVirtualHostNodeTest extends QpidTestCase -{ - private Broker<?> _broker; - private File _storePath; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _broker = BrokerTestHelper.createBrokerMock(); - TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance(); - when(_broker.getTaskExecutor()).thenReturn(taskExecutor); - when(_broker.getChildExecutor()).thenReturn(taskExecutor); - - _storePath = TestFileUtils.createTestDirectory(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_storePath != null) - { - FileUtils.delete(_storePath, true); - } - } - finally - { - super.tearDown(); - } - } - - public void testValidateOnCreateForInvalidStorePath() throws Exception - { - String nodeName = getTestName(); - File file = new File(_storePath + File.separator + nodeName); - assertTrue("Empty file is not created", file.createNewFile()); - Map<String, Object> attributes = new HashMap<>(); - attributes.put(BDBVirtualHostNode.ID, UUID.randomUUID()); - attributes.put(BDBVirtualHostNode.TYPE, BDBVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); - attributes.put(BDBVirtualHostNode.NAME, nodeName); - attributes.put(BDBVirtualHostNode.STORE_PATH, file.getAbsolutePath()); - - BDBVirtualHostNodeImpl node = new BDBVirtualHostNodeImpl(attributes, _broker); - try - { - node.create(); - fail("Cannot create DBD node from existing empty file"); - } - catch (IllegalConfigurationException e) - { - assertTrue("Unexpected exception " + e.getMessage(), e.getMessage().startsWith("Cannot open node configuration store")); - } - } - -} diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb Binary files differdeleted file mode 100644 index cfc1f05d28..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb +++ /dev/null diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt deleted file mode 100644 index a7e754f967..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt +++ /dev/null @@ -1,5 +0,0 @@ -The bdbstore v5 data were obtained by upgrading the bdbstore v4 data as part of running -test UpgradeFrom4to5Test#testPerformUpgradeWithHandlerAnsweringNo. - -The rationale for not using BDBStoreUpgradeTestPreparer in this case is that we need chunked content. -Current implementation of BDBMessageStore only stores messages in one chunk.
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb Binary files differdeleted file mode 100644 index cfc1f05d28..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb +++ /dev/null diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb Binary files differdeleted file mode 100644 index 4b45ff61e6..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb +++ /dev/null diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt deleted file mode 100644 index efb929c944..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt +++ /dev/null @@ -1,6 +0,0 @@ -The bdbstore v7 data was obtained by running 0.26 and: - -* creating an exchange 'myexch' of type direct -* creating queues 'queue1' and 'queue2' -* binding 'queue1' to 'myexch' and 'amq.direct' using binding key 'queue1' -* binding 'queue2' to amq.fanout only using binding key 'queue2'
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb Binary files differdeleted file mode 100644 index 4957f86e1a..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb +++ /dev/null diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb Binary files differdeleted file mode 100644 index 991367019f..0000000000 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb +++ /dev/null |
