summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java87
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java795
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java48
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java82
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java415
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java432
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java68
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java53
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java153
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java1057
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java70
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java67
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java181
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java83
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java344
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java445
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java378
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java102
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java146
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java109
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java160
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java444
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java350
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java95
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdbbin1366145 -> 0 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt5
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdbbin1366145 -> 0 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdbbin1336563 -> 0 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt6
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdbbin4857 -> 0 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdbbin2576 -> 0 bytes
31 files changed, 0 insertions, 6175 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
deleted file mode 100644
index 67804dc234..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import junit.framework.TestCase;
-
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * Tests for {@code AMQShortStringEncoding} including corner cases when string
- * is null or over 127 characters in length
- */
-public class AMQShortStringEncodingTest extends TestCase
-{
-
- public void testWriteReadNullValues()
- {
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(null, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertNull("Expected null but got " + result, result);
- }
-
- public void testWriteReadShortStringWithLengthOver127()
- {
- AMQShortString value = createString('a', 128);
-
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(value, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertEquals("Expected " + value + " but got " + result, value, result);
- }
-
- public void testWriteReadShortStringWithLengthLess127()
- {
- AMQShortString value = new AMQShortString("test");
-
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(value, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertEquals("Expected " + value + " but got " + result, value, result);
- }
-
- private AMQShortString createString(char ch, int length)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++)
- {
- sb.append(ch);
- }
- return new AMQShortString(sb.toString());
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
deleted file mode 100644
index ee990d3211..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ /dev/null
@@ -1,795 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.RemoteReplicationNode;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeTestHelper;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
-import org.apache.qpid.test.utils.PortHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
-import org.apache.qpid.util.FileUtils;
-
-public class BDBHAVirtualHostNodeTest extends QpidTestCase
-{
- private BDBHAVirtualHostNodeTestHelper _helper;
- private PortHelper _portHelper = new PortHelper();
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _helper = new BDBHAVirtualHostNodeTestHelper(getTestName());
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- _helper.tearDown();
- }
- finally
- {
- super.tearDown();
- }
-
- _portHelper.waitUntilAllocatedPortsAreFree();
- }
-
- public void testCreateAndActivateVirtualHostNode() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- String messageStorePath = (String)attributes.get(BDBHAVirtualHostNode.STORE_PATH);
- String repStreamTimeout = "2 h";
- Map<String,String> context = (Map<String,String>)attributes.get(BDBHAVirtualHostNode.CONTEXT);
- context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout);
- BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes);
-
- node.start();
- _helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
-
- assertEquals("Unexpected node state", State.ACTIVE, node.getState());
-
- DurableConfigurationStore store = node.getConfigurationStore();
- assertNotNull(store);
-
- BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store;
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
- ReplicationConfig replicationConfig = environment.getRepConfig();
-
- assertEquals(nodeName, environment.getNodeName());
- assertEquals(groupName, environment.getGroup().getName());
- assertEquals(helperAddress, replicationConfig.getNodeHostPort());
- assertEquals(helperAddress, replicationConfig.getHelperHosts());
-
- assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
- assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
-
- _helper.awaitForVirtualhost(node, 30000);
- VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
- assertNotNull("Virtual host child was not added", virtualHost);
- assertEquals("Unexpected virtual host name", groupName, virtualHost.getName());
- assertEquals("Unexpected virtual host store", bdbConfigurationStore.getMessageStore(), virtualHost.getMessageStore());
- assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState());
-
- node.stop();
- assertEquals("Unexpected state returned after stop", State.STOPPED, node.getState());
- assertEquals("Unexpected state", State.STOPPED, node.getState());
-
- assertNull("Virtual host is not destroyed", node.getVirtualHost());
-
- node.delete();
- assertEquals("Unexpected state returned after delete", State.DELETED, node.getState());
- assertEquals("Unexpected state", State.DELETED, node.getState());
- assertFalse("Store still exists " + messageStorePath, new File(messageStorePath).exists());
- }
-
- public void testMutableAttributes() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(attributes);
-
- BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore();
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
-
- assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority());
- assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary());
- assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride());
-
- node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2);
- node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true);
- node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1);
-
- assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority());
- assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary());
- assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride());
-
- assertNotNull("Join time should be set", node.getJoinTime());
- assertNotNull("Last known replication transaction id should be set", node.getLastKnownReplicationTransactionId());
- }
-
- public void testTransferMasterToSelf() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
- _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- _helper.createAndStartHaVHN(node2Attributes);
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
- _helper.createAndStartHaVHN(node3Attributes);
-
- BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA);
-
- replica.setAttribute(BDBHAVirtualHostNode.ROLE, replica.getRole(), NodeRole.MASTER);
-
- _helper.assertNodeRole(replica, NodeRole.MASTER);
- }
-
- public void testTransferMasterToRemoteReplica() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress,
- helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- final AtomicReference<RemoteReplicationNode<?>> lastSeenReplica = new AtomicReference<>();
- final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
- node1.addChangeListener(new NoopConfigurationChangeListener()
- {
- @Override
- public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- if (child instanceof RemoteReplicationNode)
- {
- remoteNodeLatch.countDown();
- lastSeenReplica.set((RemoteReplicationNode<?>)child);
- }
- }
- });
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
-
- assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
-
- BDBHARemoteReplicationNodeImpl replicaRemoteNode = (BDBHARemoteReplicationNodeImpl)lastSeenReplica.get();
- _helper.awaitForAttributeChange(replicaRemoteNode, BDBHARemoteReplicationNodeImpl.ROLE, NodeRole.REPLICA);
-
- replicaRemoteNode.setAttributes(Collections.<String,Object>singletonMap(BDBHARemoteReplicationNode.ROLE, NodeRole.MASTER));
-
- BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3;
- _helper.assertNodeRole(replica, NodeRole.MASTER);
- }
-
- public void testMutatingRoleWhenNotReplica_IsDisallowed() throws Exception
- {
- int nodePortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + nodePortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber);
- BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes);
- _helper.assertNodeRole(node, NodeRole.MASTER);
-
- try
- {
- node.setAttributes(Collections.<String,Object>singletonMap(BDBHAVirtualHostNode.ROLE, NodeRole.REPLICA));
- fail("Role mutation should fail");
- }
- catch(IllegalStateException e)
- {
- // PASS
- }
- }
-
-
- public void testRemoveReplicaNode() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- assertTrue(_portHelper.isPortAvailable(node1PortNumber));
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
- _helper.createAndStartHaVHN(node1Attributes);
-
- assertTrue(_portHelper.isPortAvailable(node2PortNumber));
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- _helper.createAndStartHaVHN(node2Attributes);
-
- assertTrue(_portHelper.isPortAvailable(node3PortNumber));
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
- _helper.createAndStartHaVHN(node3Attributes);
-
-
- BDBHAVirtualHostNode<?> master = _helper.awaitAndFindNodeInRole(NodeRole.MASTER);
- _helper.awaitRemoteNodes(master, 2);
-
- BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA);
- _helper.awaitRemoteNodes(replica, 2);
-
- assertNotNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName()));
- replica.delete();
-
- _helper.awaitRemoteNodes(master, 1);
-
- assertNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName()));
- }
-
- public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNode<?> node = _helper.createHaVHN(nodeAttributes);
-
- node.start();
- _helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
- assertEquals("Unexpected node state", State.ACTIVE, node.getState());
-
- _helper.awaitForVirtualhost(node,30000);
- BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
- assertNotNull("Virtual host is not created", virtualHost);
-
- _helper.awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true);
-
- assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
- assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
-
- Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
- virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "WRITE_NO_SYNC");
- virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
- virtualHost.setAttributes(virtualHostAttributes);
-
- virtualHost.stop();
- virtualHost.start();
-
- assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
- assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
- try
- {
- virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
- fail("Invalid synchronization policy is set");
- }
- catch(IllegalArgumentException e)
- {
- //pass
- }
-
- try
- {
- virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
- fail("Invalid synchronization policy is set");
- }
- catch(IllegalArgumentException e)
- {
- //pass
- }
-
- }
-
- public void testNotPermittedNodeIsNotAllowedToConnect() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
- try
- {
- _helper.createHaVHN(node3Attributes);
- fail("The VHN should not be permitted to join the group");
- }
- catch(IllegalConfigurationException e)
- {
- assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage());
- }
- }
-
- public void testCurrentNodeCannotBeRemovedFromPermittedNodeList() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
-
- String node1Address = "localhost:" + node1PortNumber;
- String node2Address = "localhost:" + node2PortNumber;
- String node3Address = "localhost:" + node3PortNumber;
-
- String groupName = "group";
- String node1Name = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name);
- BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name);
- BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
-
- _helper.awaitRemoteNodes(node1, 2);
-
- // Create new "proposed" permitted nodes list with a current node missing
- List<String> amendedPermittedNodes = new ArrayList<String>();
- amendedPermittedNodes.add(node1Address);
- amendedPermittedNodes.add(node2Address);
-
- // Try to update the permitted nodes attributes using the new list
- try
- {
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
- fail("Operation to remove current group node from permitted nodes should have failed");
- }
- catch(IllegalArgumentException e)
- {
- assertEquals("Unexpected exception message", String.format("The current group node '%s' cannot be removed from '%s' as its already a group member", node3Address, BDBHAVirtualHostNode.PERMITTED_NODES), e.getMessage());
- }
- }
-
- public void testPermittedNodesAttributeModificationConditions() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- int node3PortNumber = _portHelper.getNextAvailable();
- int node4PortNumber = _portHelper.getNextAvailable();
- int node5PortNumber = _portHelper.getNextAvailable();
-
- String node1Address = "localhost:" + node1PortNumber;
- String node2Address = "localhost:" + node2PortNumber;
- String node3Address = "localhost:" + node3PortNumber;
- String node4Address = "localhost:" + node4PortNumber;
- String node5Address = "localhost:" + node5PortNumber;
-
- String groupName = "group";
- String node1Name = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name);
- BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
-
- Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name);
- BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
-
- _helper.awaitRemoteNodes(node1, 2);
-
- // Create new "proposed" permitted nodes list for update
- List<String> amendedPermittedNodes = new ArrayList<String>();
- amendedPermittedNodes.add(node1Address);
- amendedPermittedNodes.add(node2Address);
- amendedPermittedNodes.add(node3Address);
- amendedPermittedNodes.add(node4Address);
-
- // Try to update the permitted nodes attributes using the new list on REPLICA - should fail
- BDBHAVirtualHostNode<?> nonMasterNode = _helper.findNodeInRole(NodeRole.REPLICA);
- try
- {
- nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
- fail("Operation to update permitted nodes should have failed from non MASTER node");
- }
- catch(IllegalArgumentException e)
- {
- assertEquals("Unexpected exception message", String.format("Attribute '%s' can only be set on '%s' node or node in '%s' or '%s' state", BDBHAVirtualHostNode.PERMITTED_NODES, NodeRole.MASTER, State.STOPPED, State.ERRORED), e.getMessage());
- }
-
- // Try to update the permitted nodes attributes using the new list on MASTER - should succeed
- BDBHAVirtualHostNode<?> masterNode = _helper.findNodeInRole(NodeRole.MASTER);
- masterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
-
- // Try to update the permitted nodes attributes using the new list on a STOPPED node - should succeed
- nonMasterNode.stop();
- amendedPermittedNodes.add(node5Address);
- nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
- }
-
- public void testIntruderProtection() throws Exception
- {
- int nodePortNumber = _portHelper.getNextAvailable();
- int intruderPortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + nodePortNumber;
- String groupName = "group";
- String nodeName = "node";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber, intruderPortNumber);
- BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> intruderAttributes = _helper.createNodeAttributes("intruder", groupName, "localhost:" + intruderPortNumber, helperAddress, nodeName);
- intruderAttributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
- BDBHAVirtualHostNode<?> intruder = _helper.createAndStartHaVHN(intruderAttributes);
-
- final CountDownLatch stopLatch = new CountDownLatch(1);
- ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
- {
- @Override
- public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
- {
- if (newState == State.ERRORED)
- {
- stopLatch.countDown();
- }
- }
- };
- node.addChangeListener(listener);
-
- List<String> permittedNodes = new ArrayList<String>();
- permittedNodes.add(helperAddress);
- node.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
-
- assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS));
-
- // Try top re start the ERRORED node and ensure exception is thrown
- try
- {
- node.start();
- fail("Restart of node should have thrown exception");
- }
- catch (IllegalStateException ise)
- {
- assertEquals("Unexpected exception when restarting node post intruder detection", "Intruder node detected: " + "localhost:" + intruderPortNumber, ise.getMessage());
- }
- _helper.awaitForAttributeChange(node, AbstractConfiguredObject.STATE, State.ERRORED);
- }
-
- public void testIntruderProtectionInManagementMode() throws Exception
- {
- int nodePortNumber = _portHelper.getNextAvailable();
- int intruderPortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + nodePortNumber;
- String groupName = "group";
- String nodeName = "node";
-
- Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber, intruderPortNumber);
- BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(nodeAttributes);
-
- Map<String, Object> intruderAttributes = _helper.createNodeAttributes("intruder", groupName, "localhost:" + intruderPortNumber, helperAddress, nodeName);
- intruderAttributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
- BDBHAVirtualHostNode<?> intruder = _helper.createAndStartHaVHN(intruderAttributes);
-
- final CountDownLatch stopLatch = new CountDownLatch(1);
- ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
- {
- @Override
- public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
- {
- if (newState == State.ERRORED)
- {
- stopLatch.countDown();
- }
- }
- };
- node.addChangeListener(listener);
-
- List<String> permittedNodes = new ArrayList<String>();
- permittedNodes.add(helperAddress);
- node.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
-
- assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS));
-
- // test that if management mode is enabled then the node can start without exception
- when(_helper.getBroker().isManagementMode()).thenReturn(true);
- node.start();
-
- _helper.awaitForAttributeChange(node, AbstractConfiguredObject.STATE, State.ERRORED);
- }
-
- public void testPermittedNodesChangedOnReplicaNodeOnlyOnceAfterBeingChangedOnMaster() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
- BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
- assertEquals("Unexpected role", NodeRole.REPLICA, node2.getRole());
- _helper.awaitRemoteNodes(node2, 1);
-
- BDBHARemoteReplicationNode<?> remote = _helper.findRemoteNode(node2, node1.getName());
-
- final AtomicInteger permittedNodesChangeCounter = new AtomicInteger();
- final CountDownLatch _permittedNodesLatch = new CountDownLatch(1);
- node2.addChangeListener(new NoopConfigurationChangeListener()
- {
- @Override
- public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
- {
- if (attributeName.equals(BDBHAVirtualHostNode.PERMITTED_NODES))
- {
- permittedNodesChangeCounter.incrementAndGet();
- _permittedNodesLatch.countDown();
- }
- }
- });
- List<String> permittedNodes = new ArrayList<>(node1.getPermittedNodes());
- permittedNodes.add("localhost:5000");
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
-
- assertTrue("Permitted nodes were not changed on Replica", _permittedNodesLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Not the same permitted nodes", new HashSet<>(node1.getPermittedNodes()), new HashSet<>(node2.getPermittedNodes()));
- assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get());
-
- // change the order of permitted nodes
- Collections.swap(permittedNodes, 0, 2);
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
-
- // make sure that node2 onNodeState was invoked by performing transaction on master and making sure that it was replicated
- performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote);
-
- // perform transaction second time because permitted nodes are changed after last transaction id
- performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote);
- assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get());
- }
-
- private void performTransactionAndAwaitForRemoteNodeToGetAware(BDBHAVirtualHostNode<?> node1, BDBHARemoteReplicationNode<?> remote) throws InterruptedException
- {
- new DatabasePinger().pingDb(((BDBConfigurationStore)node1.getConfigurationStore()).getEnvironmentFacade());
-
- int waitCounter = 100;
- while ( remote.getLastKnownReplicationTransactionId() != node1.getLastKnownReplicationTransactionId() && (waitCounter--) != 0)
- {
- Thread.sleep(100l);
- }
- assertEquals("Last transaction was not replicated", new Long(remote.getLastKnownReplicationTransactionId()), node1.getLastKnownReplicationTransactionId() );
- }
-
- public void testIntruderConnected() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
-
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
-
- final CountDownLatch stopLatch = new CountDownLatch(1);
- ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
- {
- @Override
- public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
- {
- if (newState == State.ERRORED)
- {
- stopLatch.countDown();
- }
- }
- };
- node1.addChangeListener(listener);
-
- String node2Name = "node2";
- File environmentPathFile = new File(_helper.getMessageStorePath() + File.separator + node2Name);
- environmentPathFile.mkdirs();
-
- ReplicationConfig replicationConfig = new ReplicationConfig(groupName, node2Name, "localhost:" + node2PortNumber );
- replicationConfig.setHelperHosts(helperAddress);
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setDurability(Durability.parse((String) node1Attributes.get(BDBHAVirtualHostNode.DURABILITY)));
-
- ReplicatedEnvironment intruder = null;
- String originalThreadName = Thread.currentThread().getName();
- try
- {
- intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
- }
- finally
- {
- try
- {
- if (intruder != null)
- {
- intruder.close();
- }
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
- }
-
- assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS));
- }
-
- public void testValidateOnCreateForNonExistingHelperNode() throws Exception
- {
- int node1PortNumber = findFreePort();
- int node2PortNumber = getNextAvailable(node1PortNumber + 1);
-
-
- Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
- "localhost:" + node2PortNumber, "node2", node1PortNumber, node1PortNumber, node2PortNumber);
- try
- {
- _helper.createAndStartHaVHN(attributes);
- fail("Node creation should fail because of invalid helper address");
- }
- catch(IllegalConfigurationException e)
- {
- assertEquals("Unexpected exception on connection to non-existing helper address",
- String.format("Cannot connect to existing node '%s' at '%s'", "node2", "localhost:" + node2PortNumber), e.getMessage());
- }
- }
-
- public void testValidateOnCreateForAlreadyBoundAddress() throws Exception
- {
- int node1PortNumber = findFreePort();
-
- ServerSocket serverSocket = null;
- try
- {
- serverSocket = new ServerSocket();
- serverSocket.setReuseAddress(true);
- serverSocket.bind(new InetSocketAddress("localhost", node1PortNumber));
-
-
- Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
- "localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber);
- try
- {
- _helper.createAndStartHaVHN(attributes);
- fail("Node creation should fail because of invalid address");
- }
- catch(IllegalConfigurationException e)
- {
- assertEquals("Unexpected exception on attempt to create node with already bound address",
- String.format("Cannot bind to address '%s'. Address is already in use.", "localhost:" + node1PortNumber), e.getMessage());
- }
- }
- finally
- {
- if (serverSocket != null)
- {
- serverSocket.close();
- }
- }
- }
-
- public void testValidateOnCreateForInvalidStorePath() throws Exception
- {
- int node1PortNumber = findFreePort();
-
- File storeBaseFolder = TestFileUtils.createTestDirectory();
- File file = new File(storeBaseFolder, getTestName());
- file.createNewFile();
- File storePath = new File(file, "test");
- try
- {
- Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
- "localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber);
- attributes.put(BDBHAVirtualHostNode.STORE_PATH, storePath.getAbsoluteFile());
- try
- {
- _helper.createAndStartHaVHN(attributes);
- fail("Node creation should fail because of invalid store path");
- }
- catch (IllegalConfigurationException e)
- {
- assertEquals("Unexpected exception on attempt to create environment in invalid location",
- String.format("Store path '%s' is not a folder", storePath.getAbsoluteFile()), e.getMessage());
- }
- }
- finally
- {
- FileUtils.delete(storeBaseFolder, true);
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
deleted file mode 100644
index a58b1e7c2e..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
-
-public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase
-{
- @Override
- protected VirtualHostNode createVirtualHostNode(String storeLocation, ConfiguredObjectFactory factory)
- {
- final BDBVirtualHostNode parent = mock(BDBVirtualHostNode.class);
- when(parent.getStorePath()).thenReturn(storeLocation);
- return parent;
- }
-
- @Override
- protected DurableConfigurationStore createConfigStore() throws Exception
- {
- return new BDBConfigurationStore(VirtualHost.class);
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
deleted file mode 100644
index ee0edb5e59..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
-
-public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
-{
- /*
- * Notes on calculation of quota limits.
- *
- * 150 32kb messages is approximately 4.8MB which is greater than
- * OVERFULL_SIZE.
- *
- * We deliberately use settings that force BDB to use multiple log files, so
- * that when one or more of them are subsequently cleaned (following message
- * consumption) the actual size on disk is reduced.
- */
-
- private static final String MAX_BDB_LOG_SIZE = "1000000"; // ~1MB
-
- private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 150;
-
- private static final long OVERFULL_SIZE = 4000000; // ~4MB
- private static final long UNDERFULL_SIZE = 3500000; // ~3.5MB
-
- @Override
- protected int getNumberOfMessagesToFillStore()
- {
- return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
- }
-
- @Override
- protected VirtualHost createVirtualHost(String storeLocation)
- {
- final BDBVirtualHost parent = mock(BDBVirtualHost.class);
- Map<String, String> contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
- when(parent.getContext()).thenReturn(contextMap);
- when(parent.getContextKeys(false)).thenReturn(contextMap.keySet());
- when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE);
- when(parent.getStorePath()).thenReturn(storeLocation);
- when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE);
- when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE);
- return parent;
- }
-
-
- @Override
- protected MessageStore createStore() throws Exception
- {
- MessageStore store = new BDBMessageStore();
- return store;
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
deleted file mode 100644
index 3f8c1a7a99..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MessagePublishInfo;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
-import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreTestCase;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.util.FileUtils;
-
-/**
- * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
- * the BDB Store as well as additional tests specific to the BDB store-implementation.
- */
-public class BDBMessageStoreTest extends MessageStoreTestCase
-{
- private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
-
- private String _storeLocation;
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- deleteStoreIfExists();
- }
- }
-
- /**
- * Tests that message metadata and content are successfully read back from a
- * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
- * verify their ability to co-exist within the store and be successful retrieved.
- */
- public void testBDBMessagePersistence() throws Exception
- {
- MessageStore bdbStore = getStore();
-
- // Create content ByteBuffers.
- // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
- // Use a single chunk for the 0-10 message as per broker behaviour.
- String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-
- ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
- ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
-
- ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
- int bodySize = completeContentBody_0_10.limit();
-
- /*
- * Create and insert a 0-8 message (metadata and multi-chunk content)
- */
- MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
- BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
-
- ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
-
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8);
-
- long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
-
- messageHandle_0_8.addContent(firstContentBytes_0_8);
- messageHandle_0_8.addContent(secondContentBytes_0_8);
- final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded();
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
- ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore();
-
- /*
- * Create and insert a 0-10 message (metadata and content)
- */
- MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
- DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
- Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
-
- MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
-
- MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
- MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10);
-
- long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
-
- messageHandle_0_10.addContent(completeContentBody_0_10);
- final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded();
- long messageid_0_10 = storedMessage_0_10.getMessageNumber();
- ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore();
-
- /*
- * reload the store only (read-only)
- */
- reopenStore();
-
- /*
- * Read back and validate the 0-8 message metadata and content
- */
- BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
- StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
-
- assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
- assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
- MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
-
- MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
- assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
- assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
- assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
- assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
-
- ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
- assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId());
- assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight());
- assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize());
-
- BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties();
- assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
- assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
-
- ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
- long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
- assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
- String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
-
- /*
- * Read back and validate the 0-10 message metadata and content
- */
- StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
-
- assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
- assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
- MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
-
- DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
- assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
- assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
- assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
- assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
- assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
- assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
-
- MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
- assertNotNull("MessageProperties were not returned", returnedMsgProps);
- assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
- assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
- assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
-
- ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
- long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
- assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
-
- String returnedPayloadString_0_10 = new String(recoveredContent.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
-
- reopenedBdbStore.closeMessageStore();
- }
-
- private DeliveryProperties createDeliveryProperties_0_10()
- {
- DeliveryProperties delProps_0_10 = new DeliveryProperties();
-
- delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- delProps_0_10.setImmediate(true);
- delProps_0_10.setExchange("exchange12345");
- delProps_0_10.setRoutingKey("routingKey12345");
- delProps_0_10.setExpiration(5);
- delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-
- return delProps_0_10;
- }
-
- private MessageProperties createMessageProperties_0_10(int bodySize)
- {
- MessageProperties msgProps_0_10 = new MessageProperties();
- msgProps_0_10.setContentLength(bodySize);
- msgProps_0_10.setCorrelationId("qwerty".getBytes());
- msgProps_0_10.setContentType("text/html");
-
- return msgProps_0_10;
- }
-
-
- private MessagePublishInfo createPublishInfoBody_0_8()
- {
- return new MessagePublishInfo(new AMQShortString("exchange12345"), false, true,
- new AMQShortString("routingKey12345"));
-
- }
-
- private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
- {
- return new ContentHeaderBody(props, length);
- }
-
- private BasicContentHeaderProperties createContentHeaderProperties_0_8()
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
- props.setContentType("text/html");
- props.getHeaders().setString("Test", "MST");
- return props;
- }
-
- public void testGetContentWithOffset() throws Exception
- {
- BDBMessageStore bdbStore = (BDBMessageStore) getStore();
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- // normal case: offset is 0
- ByteBuffer dst = ByteBuffer.allocate(10);
- int length = bdbStore.getContent(messageid_0_8, 0, dst);
- assertEquals("Unexpected length", CONTENT_BYTES.length, length);
- byte[] array = dst.array();
- assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
-
- // offset is in the middle
- dst = ByteBuffer.allocate(10);
- length = bdbStore.getContent(messageid_0_8, 5, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- byte[] expected = new byte[10];
- System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
-
- // offset beyond the content length
- dst = ByteBuffer.allocate(10);
- try
- {
- bdbStore.getContent(messageid_0_8, 15, dst);
- fail("Should fail for the offset greater than message size");
- }
- catch (RuntimeException e)
- {
- assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
- + messageid_0_8 + "!", e.getCause().getMessage());
- }
-
- // buffer is smaller then message size
- dst = ByteBuffer.allocate(5);
- length = bdbStore.getContent(messageid_0_8, 0, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- expected = new byte[5];
- System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
-
- // buffer is smaller then message size, offset is not 0
- dst = ByteBuffer.allocate(5);
- length = bdbStore.getContent(messageid_0_8, 2, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- expected = new byte[5];
- System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
- }
-
- /**
- * Tests that messages which are added to the store and then removed using the
- * public MessageStore interfaces are actually removed from the store by then
- * interrogating the store with its own implementation methods and verifying
- * expected exceptions are thrown to indicate the message is not present.
- */
- public void testMessageCreationAndRemoval() throws Exception
- {
- BDBMessageStore bdbStore = (BDBMessageStore) getStore();
-
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- bdbStore.removeMessage(messageid_0_8, true);
-
- //verify the removal using the BDB store implementation methods directly
- try
- {
- // the next line should throw since the message id should not be found
- bdbStore.getMessageMetaData(messageid_0_8);
- fail("No exception thrown when message id not found getting metadata");
- }
- catch (StoreException e)
- {
- // pass since exception expected
- }
-
- //expecting no content, allocate a 1 byte
- ByteBuffer dst = ByteBuffer.allocate(1);
-
- assertEquals("Retrieved content when none was expected",
- 0, bdbStore.getContent(messageid_0_8, 0, dst));
- }
-
- private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
- {
- ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
-
- int bodySize = CONTENT_BYTES.length;
-
- //create and store the message using the MessageStore interface
- MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
- BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
-
- ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
-
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
-
- storedMessage_0_8.addContent(chunk1);
- ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
-
- return storedMessage_0_8.allContentAdded();
- }
-
- public void testOnDelete() throws Exception
- {
- String storeLocation = getStore().getStoreLocation();
-
- File location = new File(storeLocation);
- assertTrue("Store does not exist at " + storeLocation, location.exists());
-
- getStore().closeMessageStore();
- assertTrue("Store does not exist at " + storeLocation, location.exists());
-
- BDBVirtualHost mockVH = mock(BDBVirtualHost.class);
- String testLocation = getStore().getStoreLocation();
- when(mockVH.getStorePath()).thenReturn(testLocation);
-
- getStore().onDelete(mockVH);
-
- assertFalse("Store exists at " + storeLocation, location.exists());
- }
-
-
- @Override
- protected VirtualHost createVirtualHost()
- {
- _storeLocation = TMP_FOLDER + File.separator + getTestName();
- deleteStoreIfExists();
-
- final BDBVirtualHost parent = mock(BDBVirtualHost.class);
- when(parent.getStorePath()).thenReturn(_storeLocation);
- return parent;
- }
-
- private void deleteStoreIfExists()
- {
- if (_storeLocation != null)
- {
- File location = new File(_storeLocation);
- if (location.exists())
- {
- FileUtils.delete(location, true);
- }
- }
- }
-
- @Override
- protected MessageStore createMessageStore()
- {
- return new BDBMessageStore();
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
deleted file mode 100644
index 4b5069fec8..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * Prepares an older version brokers BDB store with the required
- * contents for use in the BDBStoreUpgradeTest.
- *
- * NOTE: Must be used with the equivalent older version client!
- *
- * The store will then be used to verify that the upgraded is
- * completed properly and that once upgraded it functions as
- * expected with the new broker.
- *
- */
-public class BDBStoreUpgradeTestPreparer
-{
- private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgradeTestPreparer.class);
-
- public static final String TOPIC_NAME="myUpgradeTopic";
- public static final String SUB_NAME="myDurSubName";
- public static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
- public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
- public static final String QUEUE_NAME="myUpgradeQueue";
- public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
-
- public static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
- public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
- public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
- public static final String MISUSED_OWNER = "misused-owner-as-description";
- private static final String VIRTUAL_HOST_NAME = "test";
- private static final String SORTED_QUEUE_NAME = "mySortedQueue";
- private static final String SORT_KEY = "mySortKey";
- private static final String TEST_EXCHANGE_NAME = "myCustomExchange";
- private static final String TEST_QUEUE_NAME = "myCustomQueue";
-
- private static AMQConnectionFactory _connFac;
- private static final String CONN_URL = "amqp://guest:guest@clientid/" + VIRTUAL_HOST_NAME + "?brokerlist='tcp://localhost:5672'";
-
- /**
- * Create a BDBStoreUpgradeTestPreparer instance
- */
- public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
- {
- _connFac = new AMQConnectionFactory(CONN_URL);
- }
-
- private void prepareBroker() throws Exception
- {
- prepareQueues();
- prepareNonDurableQueue();
- prepareDurableSubscriptionWithSelector();
- prepareDurableSubscriptionWithoutSelector();
- }
-
- private void prepareNonDurableQueue() throws Exception
- {
- Connection connection = _connFac.createConnection();
- AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME);
- AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
- session.sendCreateQueue(queueName, false, false, false, null);
- session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination);
- MessageProducer messageProducer = session.createProducer(destination);
- sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
- connection.close();
- }
-
- /**
- * Prepare a queue for use in testing message and binding recovery
- * after the upgrade is performed.
- *
- * - Create a transacted session on the connection.
- * - Use a consumer to create the (durable by default) queue.
- * - Send 5 large messages to test (multi-frame) content recovery.
- * - Send 1 small message to test (single-frame) content recovery.
- * - Commit the session.
- * - Send 5 small messages to test that uncommitted messages are not recovered.
- * following the upgrade.
- * - Close the session.
- */
- private void prepareQueues() throws Exception
- {
- // Create a connection
- Connection connection = _connFac.createConnection();
- connection.start();
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- _logger.error("Error setting exception listener for connection", e);
- }
- });
- // Create a session on the connection, transacted to confirm delivery
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(QUEUE_NAME);
- // Create a consumer to ensure the queue gets created
- // (and enter it into the store, as queues are made durable by default)
- MessageConsumer messageConsumer = session.createConsumer(queue);
- messageConsumer.close();
-
- // Create a Message priorityQueueProducer
- MessageProducer messageProducer = session.createProducer(queue);
-
- // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
- // Publish 5 persistent messages, 1k chars to ensure they are single-frame
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
-
- session.commit();
-
- // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
- messageProducer.close();
- session.close();
-
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- // Create a priority queue on broker
- final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
- priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10);
- Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
- MessageProducer priorityQueueProducer = session.createProducer(priorityQueue);
-
- for (int msg = 0; msg < 5; msg++)
- {
- priorityQueueProducer.setPriority(msg % 10);
- Message message = session.createTextMessage(generateString(256*1024));
- message.setIntProperty("ID", msg);
- priorityQueueProducer.send(message);
- }
- session.commit();
- priorityQueueProducer.close();
-
- // Create a queue that has a DLQ
- final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>();
- queueWithDLQArguments.put("x-qpid-dlq-enabled", true);
- queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2);
- createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments);
-
- // Send message to the DLQ
- Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter");
- MessageProducer dlqMessageProducer = session.createProducer(dlq);
- sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1);
- session.commit();
-
- // Create a queue with JMX specifying an owner, so it can later be moved into description
- createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments);
-
- createExchange(TEST_EXCHANGE_NAME, "direct");
- Queue customQueue = createAndBindQueueOnBroker(session, TEST_QUEUE_NAME, null, TEST_EXCHANGE_NAME, "direct");
- MessageProducer customQueueMessageProducer = session.createProducer(customQueue);
- sendMessages(session, customQueueMessageProducer, customQueue, DeliveryMode.PERSISTENT, 1*1024, 1);
- session.commit();
- customQueueMessageProducer.close();
-
- prepareSortedQueue(session, SORTED_QUEUE_NAME, SORT_KEY);
-
- session.close();
- connection.close();
- }
-
- private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
- {
- return createAndBindQueueOnBroker(session, queueName, arguments, "amq.direct", "direct");
- }
-
- private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments, String exchangeName, String exchangeType) throws Exception
- {
- ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
- Queue queue = session.createQueue("BURL:" + exchangeType + "://" + exchangeName + "/" + queueName + "/" + queueName + "?durable='true'");
- ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
- return queue;
- }
-
- private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception
- {
- JMXConnector jmxConnector = createJMXConnector();
- try
- {
- MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
- ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
-
- Object[] params = new Object[] {queueName, owner, true, arguments};
- String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
- mbsc.invoke(virtualHost, "createNewQueue", params, signature);
-
- ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\",name=\"amq.direct\",ExchangeType=direct");
- mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
- }
- finally
- {
- jmxConnector.close();
- }
- }
-
- private void createExchange(String exchangeName, String exchangeType) throws Exception
- {
- JMXConnector jmxConnector = createJMXConnector();
- try
- {
- MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
- ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
-
- Object[] params = new Object[]{exchangeName, exchangeType, true};
- String[] signature = new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()};
- mbsc.invoke(virtualHost, "createNewExchange", params, signature);
- }
- finally
- {
- jmxConnector.close();
- }
- }
-
- private JMXConnector createJMXConnector() throws Exception
- {
- Map<String, Object> environment = new HashMap<>();
- environment.put(JMXConnector.CREDENTIALS, new String[] {"admin", "admin"});
- JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
- return JMXConnectorFactory.connect(url, environment);
- }
-
- private void prepareSortedQueue(Session session, String queueName, String sortKey) throws Exception
- {
- final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.queue_sort_key", sortKey);
- Queue sortedQueue = createAndBindQueueOnBroker(session, queueName, arguments);
-
- MessageProducer messageProducer2 = session.createProducer(sortedQueue);
-
- String[] sortKeys = {"c", "b", "e", "a", "d"};
- for (int i = 1; i <= sortKeys.length; i++)
- {
- Message message = session.createTextMessage(generateString(256*1024));
- message.setIntProperty("ID", i);
- message.setStringProperty(sortKey, sortKeys[i - 1]);
- messageProducer2.send(message);
- }
- session.commit();
- }
-
- /**
- * Prepare a DurableSubscription backing queue for use in testing selector
- * recovery and queue exclusivity marking during the upgrade process.
- *
- * - Create a transacted session on the connection.
- * - Open and close a DurableSubscription with selector to create the backing queue.
- * - Send a message which matches the selector.
- * - Send a message which does not match the selector.
- * - Send a message which matches the selector but will remain uncommitted.
- * - Close the session.
- */
- private void prepareDurableSubscriptionWithSelector() throws Exception
- {
-
- // Create a connection
- TopicConnection connection = _connFac.createTopicConnection();
- connection.start();
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- _logger.error("Error setting exception listener for connection", e);
- }
- });
- // Create a session on the connection, transacted to confirm delivery
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = session.createTopic(SELECTOR_TOPIC_NAME);
-
- // Create and register a durable subscriber with selector and then close it
- TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
- durSub1.close();
-
- // Create a publisher and send a persistent message which matches the selector
- // followed by one that does not match, and another which matches but is not
- // committed and so should be 'lost'
- TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
- TopicPublisher publisher = pubSession.createPublisher(topic);
-
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
- pubSession.commit();
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
-
- publisher.close();
- pubSession.close();
- connection.close();
- }
-
- /**
- * Prepare a DurableSubscription backing queue for use in testing use of
- * DurableSubscriptions without selectors following the upgrade process.
- *
- * - Create a transacted session on the connection.
- * - Open and close a DurableSubscription without selector to create the backing queue.
- * - Send a message which matches the subscription and commit session.
- * - Close the session.
- */
- private void prepareDurableSubscriptionWithoutSelector() throws Exception
- {
- // Create a connection
- TopicConnection connection = _connFac.createTopicConnection();
- connection.start();
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- _logger.error("Error setting exception listener for connection", e);
- }
- });
- // Create a session on the connection, transacted to confirm delivery
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = session.createTopic(TOPIC_NAME);
-
- // Create and register a durable subscriber without selector and then close it
- TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME);
- durSub1.close();
-
- // Create a publisher and send a persistent message which matches the subscription
- TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
- TopicPublisher publisher = pubSession.createPublisher(topic);
-
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent");
- pubSession.commit();
-
- publisher.close();
- pubSession.close();
- connection.close();
- }
-
- private static void sendMessages(Session session, MessageProducer messageProducer,
- Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
-
- private static void publishMessages(Session session, TopicPublisher publisher,
- Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- message.setStringProperty("testprop", selectorProperty);
- publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
-
- /**
- * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
- *
- * @param length number of characters in the string
- * @return string sequence of the given length
- */
- private static String generateString(int length)
- {
- char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
- char[] chars = new char[length];
- for (int i = 0; i < (length); i++)
- {
- chars[i] = base_chars[i % 10];
- }
- return new String(chars);
- }
-
- /**
- * Run the preparation tool.
- * @param args Command line arguments.
- */
- public static void main(String[] args) throws Exception
- {
- System.setProperty("qpid.dest_syntax", "BURL");
- BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
- producer.prepareBroker();
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java
deleted file mode 100644
index 5de51df992..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvHomeRegistryTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class EnvHomeRegistryTest extends TestCase
-{
-
- private final EnvHomeRegistry _ehr = new EnvHomeRegistry();
-
- public void testDuplicateEnvHomeRejected() throws Exception
- {
- File home = new File(QpidTestCase.TMP_FOLDER, getName());
-
- _ehr.registerHome(home);
- try
- {
- _ehr.registerHome(home);
- fail("Exception not thrown");
- }
- catch (IllegalArgumentException iae)
- {
- // PASS
- }
- }
-
- public void testUniqueEnvHomesAllowed() throws Exception
- {
- File home1 = new File(QpidTestCase.TMP_FOLDER, getName() + "1");
- File home2 = new File(QpidTestCase.TMP_FOLDER, getName() + "2");
-
- _ehr.registerHome(home1);
- _ehr.registerHome(home2);
- }
-
- public void testReuseOfEnvHomesAllowed() throws Exception
- {
- File home = new File(QpidTestCase.TMP_FOLDER, getName() + "1");
-
- _ehr.registerHome(home);
-
- _ehr.deregisterHome(home);
-
- _ehr.registerHome(home);
- }
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java
deleted file mode 100644
index b185a31c3b..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopConfigurationChangeListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.State;
-
-public class NoopConfigurationChangeListener implements ConfigurationChangeListener
-{
-
- public NoopConfigurationChangeListener() {
- }
-
- @Override
- public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
- {
- }
-
- @Override
- public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- }
-
- @Override
- public void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- }
-
- @Override
- public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue,
- Object newAttributeValue)
- {
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
deleted file mode 100644
index 3403ec53dc..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-
-public class StandardEnvironmentFacadeTest extends QpidTestCase
-{
- protected File _storePath;
- protected EnvironmentFacade _environmentFacade;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
- }
-
- protected void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- if (_environmentFacade != null)
- {
- _environmentFacade.close();
- }
- }
- finally
- {
- if (_storePath != null)
- {
- FileUtils.delete(_storePath, true);
- }
- }
- }
-
- public void testEnvironmentFacade() throws Exception
- {
- EnvironmentFacade ef = createEnvironmentFacade();
- assertNotNull("Environment should not be null", ef);
- Environment e = ef.getEnvironment();
- assertTrue("Environment is not valid", e.isValid());
- }
-
- public void testSecondEnvironmentFacadeUsingSamePathRejected() throws Exception
- {
- EnvironmentFacade ef = createEnvironmentFacade();
- assertNotNull("Environment should not be null", ef);
- try
- {
- createEnvironmentFacade();
- fail("Exception not thrown");
- }
- catch (IllegalArgumentException iae)
- {
- // PASS
- }
-
- ef.close();
-
- EnvironmentFacade ef2 = createEnvironmentFacade();
- assertNotNull("Environment should not be null", ef2);
- }
-
- public void testClose() throws Exception
- {
- EnvironmentFacade ef = createEnvironmentFacade();
- ef.close();
- Environment e = ef.getEnvironment();
-
- assertNull("Environment should be null after facade close", e);
- }
-
- public void testOverrideJeParameter() throws Exception
- {
- String statCollectVarName = EnvironmentConfig.STATS_COLLECT;
-
- EnvironmentFacade ef = createEnvironmentFacade();
- assertEquals("false", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
- ef.close();
-
- ef = createEnvironmentFacade(Collections.singletonMap(statCollectVarName, "true"));
- assertEquals("true", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
- ef.close();
- }
-
-
- public void testOpenDatabaseReusesCachedHandle() throws Exception
- {
- DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
-
- EnvironmentFacade ef = createEnvironmentFacade();
- Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertNotNull(handle1);
-
- Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertSame("Database handle should be cached", handle1, handle2);
-
- ef.closeDatabase("myDatabase");
-
- Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertNotSame("Expecting a new handle after database closure", handle1, handle3);
- }
-
- EnvironmentFacade createEnvironmentFacade()
- {
- _environmentFacade = createEnvironmentFacade(Collections.<String, String>emptyMap());
- return _environmentFacade;
-
- }
-
- EnvironmentFacade createEnvironmentFacade(Map<String, String> map)
- {
- StandardEnvironmentConfiguration sec = mock(StandardEnvironmentConfiguration.class);
- when(sec.getName()).thenReturn(getTestName());
- when(sec.getParameters()).thenReturn(map);
- when(sec.getStorePath()).thenReturn(_storePath.getAbsolutePath());
-
- return new StandardEnvironmentFacade(sec);
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
deleted file mode 100644
index b8c3b493bc..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ /dev/null
@@ -1,1057 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.replication;
-
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.StringBinding;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.je.rep.NodeState;
-import com.sleepycat.je.rep.ReplicaWriteException;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicatedEnvironment.State;
-import com.sleepycat.je.rep.ReplicationConfig;
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.test.utils.PortHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
-import org.apache.qpid.util.FileUtils;
-
-public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacadeTest.class);
- private static final int LISTENER_TIMEOUT = 5;
- private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
-
- private final PortHelper _portHelper = new PortHelper();
-
- private final String TEST_GROUP_NAME = "testGroupName";
- private final String TEST_NODE_NAME = "testNodeName";
- private final int TEST_NODE_PORT = _portHelper.getNextAvailable();
- private final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
- private final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
- private final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY");
- private final boolean TEST_DESIGNATED_PRIMARY = false;
- private final int TEST_PRIORITY = 1;
- private final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
-
- private File _storePath;
- private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- _storePath = TestFileUtils.createTestDirectory("bdb", true);
-
- setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- for (EnvironmentFacade ef : _nodes.values())
- {
- ef.close();
- }
- }
- finally
- {
- try
- {
- if (_storePath != null)
- {
- FileUtils.delete(_storePath, true);
- }
- }
- finally
- {
- super.tearDown();
- }
- }
-
- _portHelper.waitUntilAllocatedPortsAreFree();
- }
- public void testEnvironmentFacade() throws Exception
- {
- EnvironmentFacade ef = createMaster();
- assertNotNull("Environment should not be null", ef);
- Environment e = ef.getEnvironment();
- assertTrue("Environment is not valid", e.isValid());
- }
-
- public void testClose() throws Exception
- {
- EnvironmentFacade ef = createMaster();
- ef.close();
- Environment e = ef.getEnvironment();
-
- assertNull("Environment should be null after facade close", e);
- }
-
- public void testOpenDatabaseReusesCachedHandle() throws Exception
- {
- DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
-
- EnvironmentFacade ef = createMaster();
- Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertNotNull(handle1);
-
- Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertSame("Database handle should be cached", handle1, handle2);
-
- ef.closeDatabase("myDatabase");
-
- Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
- assertNotSame("Expecting a new handle after database closure", handle1, handle3);
- }
-
- public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception
- {
- DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
-
- EnvironmentFacade ef = createMaster();
- ef.close();
-
- try
- {
- ef.openDatabase("myDatabase", createIfAbsentDbConfig );
- fail("Database open should fail");
- }
- catch(ConnectionScopedRuntimeException e)
- {
- assertEquals("Unexpected exception", "Environment facade is not in opened state", e.getMessage());
- }
- }
-
- public void testGetGroupName() throws Exception
- {
- assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
- }
-
- public void testGetNodeName() throws Exception
- {
- assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
- }
-
- public void testLastKnownReplicationTransactionId() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
- assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
- }
-
- public void testGetNodeHostPort() throws Exception
- {
- assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
- }
-
- public void testGetHelperHostPort() throws Exception
- {
- assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
- }
-
- public void testSetMessageStoreDurability() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- assertEquals("Unexpected message store durability",
- new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
- master.getRealMessageStoreDurability());
- assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
- assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
-
- master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
- assertEquals("Unexpected message store durability",
- new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
- master.getRealMessageStoreDurability());
- assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
- }
-
- public void testGetNodeState() throws Exception
- {
- assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
- }
-
- public void testPriority() throws Exception
- {
- ReplicatedEnvironmentFacade facade = createMaster();
- assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
- Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
- future.get(5, TimeUnit.SECONDS);
- assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
- }
-
- public void testDesignatedPrimary() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
- Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
- future.get(5, TimeUnit.SECONDS);
- assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
- }
-
- public void testElectableGroupSizeOverride() throws Exception
- {
- ReplicatedEnvironmentFacade facade = createMaster();
- assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
- Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
- future.get(5, TimeUnit.SECONDS);
- assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
- }
-
- public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- String nodeName2 = TEST_NODE_NAME + "_2";
- String host = "localhost";
- int port = _portHelper.getNextAvailable();
- String node2NodeHostPort = host + ":" + port;
-
- final AtomicInteger invocationCount = new AtomicInteger();
- final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public void onReplicationNodeRecovered(ReplicationNode node)
- {
- nodeRecoveryLatch.countDown();
- invocationCount.incrementAndGet();
- }
- };
-
- createReplica(nodeName2, node2NodeHostPort, listener);
-
- assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
-
- assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
- }
-
- public void testReplicationGroupListenerHearsNodeAdded() throws Exception
- {
- final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
- final AtomicInteger invocationCount = new AtomicInteger();
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- invocationCount.getAndIncrement();
- nodeAddedLatch.countDown();
- }
- };
-
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
- assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
-
- String node2Name = TEST_NODE_NAME + "_2";
- String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
- replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
- createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
-
- assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
-
- assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
- }
-
- public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
- {
- final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
- final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
- final AtomicInteger invocationCount = new AtomicInteger();
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public void onReplicationNodeRecovered(ReplicationNode node)
- {
- nodeAddedLatch.countDown();
- }
-
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- nodeAddedLatch.countDown();
- }
-
- @Override
- public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
- {
- invocationCount.getAndIncrement();
- nodeDeletedLatch.countDown();
- }
- };
-
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
- assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- String node2Name = TEST_NODE_NAME + "_2";
- String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
- replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
- createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
-
- assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
-
- // Need to await the listener hearing the addition of the node to the model.
- assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- // Now remove the node and ensure we hear the event
- replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
-
- assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
-
- assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
- }
-
- public void testMasterHearsRemoteNodeRoles() throws Exception
- {
- final String node2Name = TEST_NODE_NAME + "_2";
- final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
- final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
- final CountDownLatch stateLatch = new CountDownLatch(1);
- final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- nodeRef.set(node);
- nodeAddedLatch.countDown();
- }
-
- @Override
- public void onNodeState(ReplicationNode node, NodeState nodeState)
- {
- if (node2Name.equals(node.getName()))
- {
- stateRef.set(nodeState);
- stateLatch.countDown();
- }
- }
- };
-
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
- assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
- replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
- createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
-
- assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
-
- assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- ReplicationNode remoteNode = (ReplicationNode)nodeRef.get();
- assertEquals("Unexpected node name", node2Name, remoteNode.getName());
-
- assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- assertEquals("Unexpected node state", State.REPLICA, stateRef.get().getNodeState());
- }
-
- public void testRemoveNodeFromGroup() throws Exception
- {
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade environmentFacade = addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, stateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
-
- String node2Name = TEST_NODE_NAME + "_2";
- String node2NodeHostPort = "localhost:" + _portHelper.getNextAvailable();
- ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
-
- assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
- ref2.close();
-
- environmentFacade.removeNodeFromGroup(node2Name);
- assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
- }
-
-
- public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
- {
- final String replicaName = TEST_NODE_NAME + "_1";
- final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
- final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
- final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
- final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
- final CountDownLatch stateLatch = new CountDownLatch(1);
- final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
-
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- if (addedNodeRef.compareAndSet(null, node))
- {
- nodeAddedLatch.countDown();
- }
- }
-
- @Override
- public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
- {
- removedNodeRef.set(node);
- nodeRemovedLatch.countDown();
- }
-
- @Override
- public void onNodeState(ReplicationNode node, NodeState nodeState)
- {
- if (replicaName.equals(node.getName()))
- {
- stateRef.set(nodeState);
- stateLatch.countDown();
- }
- }
- };
-
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- final ReplicatedEnvironmentFacade masterEnvironment = addNode(stateChangeListener, listener);
- assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- masterEnvironment.setDesignatedPrimary(true);
-
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
- masterEnvironment.setPermittedNodes(Arrays.asList(masterEnvironment.getHostPort(), node1NodeHostPort));
- ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener());
-
- assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
-
- ReplicationNode node = addedNodeRef.get();
- assertEquals("Unexpected node name", replicaName, node.getName());
-
- assertTrue("Node state was not heard", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
- assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState());
- assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName());
-
- replica.close();
- masterEnvironment.removeNodeFromGroup(node.getName());
-
- assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
- assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
- }
-
- public void testCloseStateTransitions() throws Exception
- {
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
-
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
- replicatedEnvironmentFacade.close();
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
- }
-
- public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
- {
- final CountDownLatch masterLatch = new CountDownLatch(1);
- final AtomicInteger masterStateChangeCount = new AtomicInteger();
- final CountDownLatch unknownLatch = new CountDownLatch(1);
- final AtomicInteger unknownStateChangeCount = new AtomicInteger();
- StateChangeListener stateChangeListener = new StateChangeListener()
- {
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
- {
- if (stateChangeEvent.getState() == State.MASTER)
- {
- masterStateChangeCount.incrementAndGet();
- masterLatch.countDown();
- }
- else if (stateChangeEvent.getState() == State.UNKNOWN)
- {
- unknownStateChangeCount.incrementAndGet();
- unknownLatch.countDown();
- }
- }
- };
-
- addNode(stateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
- int replica2Port = _portHelper.getNextAvailable();
- String node2NodeHostPort = "localhost:" + replica2Port;
-
- ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
- ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
-
- // close replicas
- replica1.close();
- replica2.close();
-
- assertTrue("Environment should be recreated and go into unknown state",
- unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
-
- assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
- assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
- }
-
- public void testTransferMasterToSelf() throws Exception
- {
- final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
- final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
- StateChangeListener stateChangeListener = new StateChangeListener(){
-
- @Override
- public void stateChange(StateChangeEvent event) throws RuntimeException
- {
- ReplicatedEnvironment.State state = event.getState();
- if (state == ReplicatedEnvironment.State.REPLICA)
- {
- firstNodeReplicaStateLatch.countDown();
- }
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- firstNodeMasterStateLatch.countDown();
- }
- }
- };
- ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
-
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
- assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
-
- int replica2Port = _portHelper.getNextAvailable();
- String node2NodeHostPort = "localhost:" + replica2Port;
- final CountDownLatch replicaStateLatch = new CountDownLatch(1);
- final CountDownLatch masterStateLatch = new CountDownLatch(1);
- StateChangeListener testStateChangeListener = new StateChangeListener()
- {
- @Override
- public void stateChange(StateChangeEvent event) throws RuntimeException
- {
- ReplicatedEnvironment.State state = event.getState();
- if (state == ReplicatedEnvironment.State.REPLICA)
- {
- replicaStateLatch.countDown();
- }
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- masterStateLatch.countDown();
- }
- }
- };
- ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY,
- testStateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
- assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
-
- thirdNode.transferMasterToSelfAsynchronously();
- assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
- assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
- }
-
- public void testTransferMasterAnotherNode() throws Exception
- {
- final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
- final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
- StateChangeListener stateChangeListener = new StateChangeListener(){
-
- @Override
- public void stateChange(StateChangeEvent event) throws RuntimeException
- {
- ReplicatedEnvironment.State state = event.getState();
- if (state == ReplicatedEnvironment.State.REPLICA)
- {
- firstNodeReplicaStateLatch.countDown();
- }
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- firstNodeMasterStateLatch.countDown();
- }
- }
- };
- ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
-
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
- assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
-
- int replica2Port = _portHelper.getNextAvailable();
- String node2NodeHostPort = "localhost:" + replica2Port;
- final CountDownLatch replicaStateLatch = new CountDownLatch(1);
- final CountDownLatch masterStateLatch = new CountDownLatch(1);
- StateChangeListener testStateChangeListener = new StateChangeListener()
- {
- @Override
- public void stateChange(StateChangeEvent event) throws RuntimeException
- {
- ReplicatedEnvironment.State state = event.getState();
- if (state == ReplicatedEnvironment.State.REPLICA)
- {
- replicaStateLatch.countDown();
- }
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- masterStateLatch.countDown();
- }
- }
- };
- String thirdNodeName = TEST_NODE_NAME + "_2";
- ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY,
- testStateChangeListener, new NoopReplicationGroupListener());
- assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
- assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
-
- firstNode.transferMasterAsynchronously(thirdNodeName);
- assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
- assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
- }
-
- public void testBeginTransaction() throws Exception
- {
- ReplicatedEnvironmentFacade facade = createMaster();
- Transaction txn = null;
- try
- {
- txn = facade.beginTransaction();
- assertNotNull("Transaction is not created", txn);
- txn.commit();
- txn = null;
- }
- finally
- {
- if (txn != null)
- {
- txn.abort();
- }
- }
- }
-
- public void testSetPermittedNodes() throws Exception
- {
- ReplicatedEnvironmentFacade firstNode = createMaster();
-
- Set<String> permittedNodes = new HashSet<String>();
- permittedNodes.add("localhost:" + TEST_NODE_PORT);
- permittedNodes.add("localhost:" + _portHelper.getNextAvailable());
- firstNode.setPermittedNodes(permittedNodes);
-
- ReplicatedEnvironmentFacade.ReplicationNodeImpl replicationNode = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT);
- NodeState nodeState = ReplicatedEnvironmentFacade.getRemoteNodeState(TEST_GROUP_NAME, replicationNode, 5000);
-
- ObjectMapper objectMapper = new ObjectMapper();
-
- Map<String, Object> settings = objectMapper.readValue(nodeState.getAppState(), Map.class);
- Collection<String> appStatePermittedNodes = (Collection<String>)settings.get(ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST);
- assertEquals("Unexpected permitted nodes", permittedNodes, new HashSet<String>(appStatePermittedNodes));
- }
-
- public void testPermittedNodeIsAllowedToConnect() throws Exception
- {
- ReplicatedEnvironmentFacade firstNode = createMaster();
-
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
-
- Set<String> permittedNodes = new HashSet<String>();
- permittedNodes.add("localhost:" + TEST_NODE_PORT);
- permittedNodes.add(node1NodeHostPort);
- firstNode.setPermittedNodes(permittedNodes);
-
- ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false);
- when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
-
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.REPLICA);
- ReplicatedEnvironmentFacade secondNode = createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1",
- stateChangeListener, new NoopReplicationGroupListener(), configuration);
- assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState());
- }
-
- public void testIntruderNodeIsDetected() throws Exception
- {
- final CountDownLatch intruderLatch = new CountDownLatch(1);
- ReplicationGroupListener listener = new NoopReplicationGroupListener()
- {
- @Override
- public boolean onIntruderNode(ReplicationNode node)
- {
- intruderLatch.countDown();
- return true;
- }
- };
- ReplicatedEnvironmentFacade firstNode = createMaster(listener);
- int replica1Port = _portHelper.getNextAvailable();
- String node1NodeHostPort = "localhost:" + replica1Port;
-
- Set<String> permittedNodes = new HashSet<String>();
- permittedNodes.add("localhost:" + TEST_NODE_PORT);
-
- firstNode.setPermittedNodes(permittedNodes);
-
- String nodeName = TEST_NODE_NAME + "_1";
- createIntruder(nodeName, node1NodeHostPort);
- assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
- }
-
- public void testNodeRolledback() throws Exception
- {
- DatabaseConfig createConfig = new DatabaseConfig();
- createConfig.setAllowCreate(true);
- createConfig.setTransactional(true);
-
- ReplicatedEnvironmentFacade node1 = createMaster();
-
- String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
-
- String replicaName = TEST_NODE_NAME + 1;
- ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
-
- node1.setDesignatedPrimary(true);
-
- Transaction txn = node1.beginTransaction();
- Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig);
- txn.commit();
-
- // Put a record (that will be replicated)
- putRecord(node1, db, 1, "value1");
-
- node2.close();
-
- // Put a record (that will be only on node1 as node2 is now offline)
- putRecord(node1, db, 2, "value2");
-
- db.close();
-
- // Stop node1
- node1.close();
-
- // Restart the node2, making it primary so it becomes master
- TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER);
- node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
- boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
- assertTrue(replicaName + " did not go into desired state; current actual state is "
- + node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
-
- txn = node2.beginTransaction();
- db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT);
- txn.commit();
-
- // Do a transaction on node2. The two environments will have diverged
- putRecord(node2, db, 3, "diverged");
-
- // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
- TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA);
- final CountDownLatch _replicaRolledback = new CountDownLatch(1);
- node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener()
- {
- @Override
- public void onNodeRolledback()
- {
- _replicaRolledback.countDown();
- }
- });
- assertTrue("Node 1 did not go into desired state and remained in state " + node1.getNodeState(),
- node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- assertTrue("Node 1 did not experience rollback within timeout",
- _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
-
- // Finally do one more transaction through the master
- putRecord(node2, db, 4, "value4");
- db.close();
-
- node1.close();
- node2.close();
- }
-
- public void testReplicaTransactionBeginsImmediately() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- String nodeName2 = TEST_NODE_NAME + "_2";
- String host = "localhost";
- int port = _portHelper.getNextAvailable();
- String node2NodeHostPort = host + ":" + port;
-
- final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
-
- // close the master
- master.close();
-
- // try to create a transaction in a separate thread
- // and make sure that transaction is created immediately.
- ExecutorService service = Executors.newSingleThreadExecutor();
- try
- {
-
- Future<Transaction> future = service.submit(new Callable<Transaction>(){
-
- @Override
- public Transaction call() throws Exception
- {
- return replica.getEnvironment().beginTransaction(null, null);
- }
- });
- Transaction transaction = future.get(5, TimeUnit.SECONDS);
- assertNotNull("Transaction was not created during expected time", transaction);
- transaction.abort();
- }
- finally
- {
- service.shutdown();
- }
- }
-
- public void testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- String nodeName2 = TEST_NODE_NAME + "_2";
- String host = "localhost";
- int port = _portHelper.getNextAvailable();
- String node2NodeHostPort = host + ":" + port;
-
- final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
-
- // close the master
- master.close();
-
- try
- {
- replica.openDatabase("test", DatabaseConfig.DEFAULT.setAllowCreate(true) );
- fail("Replica write operation should fail");
- }
- catch(ReplicaWriteException e)
- {
- RuntimeException handledException = master.handleDatabaseException("test", e);
- assertTrue("Unexpected exception", handledException instanceof ConnectionScopedRuntimeException);
- }
- }
-
- private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
- final String dataValue)
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
-
- Transaction txn = master.beginTransaction();
- IntegerBinding.intToEntry(keyValue, key);
- StringBinding.stringToEntry(dataValue, data);
-
- db.put(txn, key, data);
- txn.commit();
- }
-
-
- private void createIntruder(String nodeName, String node1NodeHostPort)
- {
- File environmentPathFile = new File(_storePath, nodeName);
- environmentPathFile.mkdirs();
-
- ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, nodeName, node1NodeHostPort);
- replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT);
-
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setDurability(TEST_DURABILITY);
- ReplicatedEnvironment intruder = null;
- try
- {
- intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
- }
- finally
- {
- if (intruder != null)
- {
- intruder.close();
- }
- }
- }
-
- private ReplicatedEnvironmentFacade createMaster() throws Exception
- {
- return createMaster(new NoopReplicationGroupListener());
- }
-
- private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception
- {
- TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade env = addNode(stateChangeListener, replicationGroupListener);
- assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- return env;
- }
-
- private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception
- {
- TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
- return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener);
- }
-
- private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort,
- TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener)
- throws InterruptedException
- {
- ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY,
- testStateChangeListener, replicationGroupListener);
- boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
- assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
- return replicaEnvironmentFacade;
- }
-
- private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
- StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
- {
- ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
- return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
- }
-
- private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) {
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
- ref.setStateChangeListener(stateChangeListener);
- ref.setReplicationGroupListener(replicationGroupListener);
- ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
- _nodes.put(nodeName, ref);
- return ref;
- }
-
- private ReplicatedEnvironmentFacade addNode(StateChangeListener stateChangeListener,
- ReplicationGroupListener replicationGroupListener)
- {
- return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY,
- stateChangeListener, replicationGroupListener);
- }
-
- private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
- {
- ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
- when(node.getName()).thenReturn(nodeName);
- when(node.getHostPort()).thenReturn(nodeHostPort);
- when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
- when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
- when(node.getPriority()).thenReturn(TEST_PRIORITY);
- when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
- when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
- when(node.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
-
- when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(60000);
- when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(10000);
- when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME), anyInt())).thenReturn(1000);
- when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME), anyInt())).thenReturn(3);
- when(node.getFacadeParameter(eq(ReplicatedEnvironmentFacade.EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(10000);
-
- Map<String, String> repConfig = new HashMap<String, String>();
- repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
- repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
- when(node.getReplicationParameters()).thenReturn(repConfig);
- when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
- return node;
- }
-
- class NoopReplicationGroupListener implements ReplicationGroupListener
- {
-
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- }
-
- @Override
- public void onReplicationNodeRecovered(ReplicationNode node)
- {
- }
-
- @Override
- public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
- {
- }
-
- @Override
- public void onNodeState(ReplicationNode node, NodeState nodeState)
- {
- }
-
- @Override
- public boolean onIntruderNode(ReplicationNode node)
- {
- LOGGER.warn("Intruder node " + node);
- return true;
- }
-
- @Override
- public void onNoMajority()
- {
- }
-
- @Override
- public void onNodeRolledback()
- {
- }
-
- @Override
- public void onException(Exception e)
- {
- }
-
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
deleted file mode 100644
index 0870191b35..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.replication;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.sleepycat.je.rep.ReplicatedEnvironment.State;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
-
-class TestStateChangeListener implements StateChangeListener
-{
- private final Set<State> _expectedStates;
- private final CountDownLatch _latch;
- private final AtomicReference<State> _currentActualState = new AtomicReference<State>();
-
- public TestStateChangeListener(State expectedState)
- {
- this(Collections.singleton(expectedState));
- }
-
- public TestStateChangeListener(Set<State> expectedStates)
- {
- _expectedStates = new HashSet<State>(expectedStates);
- _latch = new CountDownLatch(1);
- }
-
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
- {
- _currentActualState.set(stateChangeEvent.getState());
- if (_expectedStates.contains(stateChangeEvent.getState()))
- {
- _latch.countDown();
- }
- }
-
- public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException
- {
- return _latch.await(timeout, timeUnit);
- }
-
- public State getCurrentActualState()
- {
- return _currentActualState.get();
- }
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
deleted file mode 100644
index 965cad1cb5..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuple;
-
-import java.util.Collections;
-import java.util.Map;
-import junit.framework.TestCase;
-
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-
-public class ConfiguredObjectBindingTest extends TestCase
-{
-
- private ConfiguredObjectRecord _object;
-
- private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP =
- Collections.singletonMap("dummy",(Object) "attributes");
-
- private static final String DUMMY_TYPE_STRING = "dummyType";
- private ConfiguredObjectBinding _configuredObjectBinding;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- _object = new ConfiguredObjectRecordImpl(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
- DUMMY_ATTRIBUTES_MAP);
- }
-
- public void testObjectToEntryAndEntryToObject()
- {
- TupleOutput tupleOutput = new TupleOutput();
-
- _configuredObjectBinding.objectToEntry(_object, tupleOutput);
-
- byte[] entryAsBytes = tupleOutput.getBufferBytes();
- TupleInput tupleInput = new TupleInput(entryAsBytes);
-
- ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput);
- assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes());
- assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType());
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
deleted file mode 100644
index ce143aba1b..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.InputStream;
-import java.util.UUID;
-
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.subjects.TestBlankSubject;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.je.Database;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.Transaction;
-
-public abstract class AbstractUpgradeTestCase extends QpidTestCase
-{
- protected static final class StaticAnswerHandler implements UpgradeInteractionHandler
- {
- private UpgradeInteractionResponse _response;
-
- public StaticAnswerHandler(UpgradeInteractionResponse response)
- {
- _response = response;
- }
-
- @Override
- public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
- UpgradeInteractionResponse... possibleResponses)
- {
- return _response;
- }
- }
-
- public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", QUEUE_NAME, NON_DURABLE_QUEUE_NAME,
- NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, PRIORITY_QUEUE_NAME, QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME + "_DLQ" };
- public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0, 0, 0, 1};
- public static int TOTAL_MESSAGE_NUMBER = 16;
- protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
-
- // myQueueWithDLQ_DLQ is not bound to the default exchange
- protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2 - 1;
- protected static final int TOTAL_EXCHANGES = 6;
-
- private File _storeLocation;
- protected Environment _environment;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _storeLocation = copyStore(getStoreDirectoryName());
-
- _environment = createEnvironment(_storeLocation);
- }
-
- /** @return eg "bdbstore-v4" - used for copying store */
- protected abstract String getStoreDirectoryName();
-
- protected Environment createEnvironment(File storeLocation)
- {
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
- envConfig.setReadOnly(false);
- envConfig.setSharedCache(false);
- envConfig.setCacheSize(0);
- return new Environment(storeLocation, envConfig);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- _environment.close();
- }
- finally
- {
- _environment = null;
- deleteDirectoryIfExists(_storeLocation);
- }
- super.tearDown();
- }
-
- private File copyStore(String storeDirectoryName) throws Exception
- {
- File storeLocation = new File(new File(TMP_FOLDER), "test-store");
- deleteDirectoryIfExists(storeLocation);
- storeLocation.mkdirs();
- int index = 0;
- String prefix = "0000000";
- String extension = ".jdb";
- InputStream is = null;
- do
- {
- String fileName = prefix + index + extension;
- is = getClass().getClassLoader().getResourceAsStream("upgrade/" + storeDirectoryName + "/test-store/" + fileName);
- if (is != null)
- {
- FileUtils.copy(is, new File(storeLocation, fileName));
- }
- index++;
- }
- while (is != null);
- return storeLocation;
- }
-
- protected void deleteDirectoryIfExists(File dir)
- {
- if (dir.exists())
- {
- assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory());
-
- boolean deletedSuccessfully = FileUtils.delete(dir, true);
-
- assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully);
- }
- }
-
- protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber)
- {
- long count = getDatabaseCount(databaseName);
- assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count);
- }
-
- protected long getDatabaseCount(String databaseName)
- {
- DatabaseCallable<Long> operation = new DatabaseCallable<Long>()
- {
-
- @Override
- public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
- {
- return new Long(sourceDatabase.count());
-
- }
- };
- Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation);
- return count.longValue();
- }
-
- public VirtualHost getVirtualHost()
- {
- VirtualHost virtualHost = mock(VirtualHost.class);
- when(virtualHost.getName()).thenReturn(getName());
- when(virtualHost.getId()).thenReturn(UUID.randomUUID());
- return virtualHost;
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
deleted file mode 100644
index 7ec442b73d..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import junit.framework.TestCase;
-
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.Transaction;
-
-public class DatabaseTemplateTest extends TestCase
-{
- private static final String SOURCE_DATABASE = "sourceDatabase";
- private Environment _environment;
- private Database _sourceDatabase;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _environment = mock(Environment.class);
- _sourceDatabase = mock(Database.class);
- when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class)))
- .thenReturn(_sourceDatabase);
- }
-
- public void testExecuteWithTwoDatabases()
- {
- String targetDatabaseName = "targetDatabase";
- Database targetDatabase = mock(Database.class);
-
- Transaction txn = mock(Transaction.class);
-
- when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class)))
- .thenReturn(targetDatabase);
-
- DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn);
-
- DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
- databaseTemplate.run(databaseOperation);
-
- verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn);
- verify(_sourceDatabase).close();
- verify(targetDatabase).close();
- }
-
- public void testExecuteWithOneDatabases()
- {
- DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null);
-
- DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
- databaseTemplate.run(databaseOperation);
-
- verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null);
- verify(_sourceDatabase).close();
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
deleted file mode 100644
index d0f9455d9a..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord;
-
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Transaction;
-
-public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
-{
- private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
- private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
- private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
- private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5";
- private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5";
- private static final String DELIVERY_DB_NAME = "deliveryDb_v5";
- private static final String BINDING_DB_NAME = "queueBindingsDb_v5";
-
- @Override
- protected String getStoreDirectoryName()
- {
- return "bdbstore-v4";
- }
-
- public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
- {
- UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost());
-
- assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
-
- assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER);
- assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER);
- assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
-
- for (int i = 0; i < QUEUE_SIZES.length; i++)
- {
- assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]);
- }
-
- final List<BindingRecord> queueBindings = loadBindings();
-
- assertEquals("Unxpected bindings size", TOTAL_BINDINGS, queueBindings.size());
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, "");
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", SELECTOR_TOPIC_NAME, "testprop='true'");
- assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null);
- assertBindingRecord(queueBindings, NON_DURABLE_QUEUE_NAME, "amq.direct", NON_DURABLE_QUEUE_NAME, null);
- assertBindingRecord(queueBindings, NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, null);
-
- assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
-
- assertContent();
- }
-
- public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
- {
- UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHost());
- HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES));
- assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME));
-
- assertQueues(queues);
-
- assertDatabaseRecordCount(DELIVERY_DB_NAME, 13);
- assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 13);
- assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
-
- assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1);
- assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1);
- assertQueueMessages(QUEUE_NAME, 10);
- assertQueueMessages(QUEUE_WITH_DLQ_NAME + "_DLQ", 1);
-
- final List<BindingRecord> queueBindings = loadBindings();
-
- assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size());
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, "");
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
- SELECTOR_TOPIC_NAME, "testprop='true'");
- assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null);
-
- assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
-
- assertContent();
- }
-
- private List<BindingRecord> loadBindings()
- {
- final BindingTuple bindingTuple = new BindingTuple();
- final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>();
- CursorOperation databaseOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- BindingRecord bindingRecord = bindingTuple.entryToObject(key);
-
- AMQShortString queueName = bindingRecord.getQueueName();
- AMQShortString exchangeName = bindingRecord.getExchangeName();
- AMQShortString routingKey = bindingRecord.getRoutingKey();
- FieldTable arguments = bindingRecord.getArguments();
- queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments));
- }
- };
- new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation);
- return queueBindings;
- }
-
- private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName,
- String routingKey, String selectorKey)
- {
- BindingRecord record = null;
- for (BindingRecord bindingRecord : queueBindings)
- {
- if (bindingRecord.getQueueName().asString().equals(queueName)
- && bindingRecord.getExchangeName().asString().equals(exchangeName))
- {
- record = bindingRecord;
- break;
- }
- }
- assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record);
- assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString());
-
- if (selectorKey != null)
- {
- assertEquals("Unexpected selector key for " + queueName, selectorKey,
- record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()));
- }
- }
-
- private void assertQueueMessages(final String queueName, final int expectedQueueSize)
- {
- final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize);
-
- assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue);
-
- assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue);
- }
-
- private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize)
- {
- final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
- final AtomicInteger deliveryCounter = new AtomicInteger();
- final Set<Long> messagesForQueue = new HashSet<Long>();
-
- CursorOperation deliveryDatabaseOperation = new CursorOperation()
- {
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
- String thisQueueName = entryKey.getQueueName().asString();
- if (thisQueueName.equals(queueName))
- {
- deliveryCounter.incrementAndGet();
- messagesForQueue.add(entryKey.getMessageId());
- }
- }
- };
- new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation);
-
- assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize,
- deliveryCounter.get());
-
- return messagesForQueue;
- }
-
- private void assertMetadataForQueue(final String queueName, final int expectedQueueSize,
- final Set<Long> messageIdsForQueue)
- {
- final AtomicInteger metadataCounter = new AtomicInteger();
- CursorOperation databaseOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- Long messageId = LongBinding.entryToLong(key);
-
- boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId);
- if (messageIsForTheRightQueue)
- {
- metadataCounter.incrementAndGet();
- }
- }
- };
- new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation);
-
- assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize,
- metadataCounter.get());
- }
-
- private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue)
- {
- final AtomicInteger contentCounter = new AtomicInteger();
- final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
- CursorOperation cursorOperation = new CursorOperation()
- {
- private long _prevMsgId = -1;
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- MessageContentKey contentKey = keyBinding.entryToObject(key);
- long msgId = contentKey.getMessageId();
-
- if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId))
- {
- contentCounter.incrementAndGet();
- }
-
- _prevMsgId = msgId;
- }
- };
- new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation);
-
- assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize,
- contentCounter.get());
- }
-
- private void assertQueues(Set<String> expectedQueueNames)
- {
- List<AMQShortString> durableSubNames = Collections.emptyList();
- final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
- final Set<String> actualQueueNames = new HashSet<String>();
-
- CursorOperation queueNameCollector = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- QueueRecord record = binding.entryToObject(value);
- String queueName = record.getNameShortString().asString();
- actualQueueNames.add(queueName);
- }
- };
- new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
-
- assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames);
- }
-
- private void assertQueueHasOwner(String queueName, final String expectedOwner)
- {
- List<AMQShortString> durableSubNames = Collections.emptyList();
- final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
- final AtomicReference<String> actualOwner = new AtomicReference<String>();
- final AtomicBoolean foundQueue = new AtomicBoolean(false);
-
- CursorOperation queueNameCollector = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- QueueRecord record = binding.entryToObject(value);
- String queueName = record.getNameShortString().asString();
- if (queueName.equals(queueName))
- {
- foundQueue.set(true);
- actualOwner.set(AMQShortString.toString(record.getOwner()));
- }
- }
- };
- new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
-
- assertTrue("Could not find queue in database", foundQueue.get());
- assertEquals("Queue has unexpected owner", expectedOwner, actualOwner.get());
- }
-
- private void assertContent()
- {
- final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding();
- CursorOperation contentCursorOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
- DatabaseEntry value)
- {
- long id = LongBinding.entryToLong(key);
- assertTrue("Unexpected id", id > 0);
- ByteBuffer content = contentBinding.entryToObject(value);
- assertNotNull("Unexpected content", content);
- }
- };
- new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation);
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
deleted file mode 100644
index 6351ee5205..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_METADATA_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_XID_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_CONTENT_DB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.Transaction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecordImpl;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding;
-import org.apache.qpid.server.util.MapJsonSerializer;
-
-public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(UpgradeFrom5To6Test.class);
- private static final String ARGUMENTS = "arguments";
-
- @Override
- protected String getStoreDirectoryName()
- {
- return "bdbstore-v5";
- }
-
- public void testPerformUpgrade() throws Exception
- {
- UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
-
- assertDatabaseRecordCounts();
- assertContent();
-
- assertConfiguredObjects();
- assertQueueEntries();
- }
-
- public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception
- {
- corruptDatabase();
-
- UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost());
-
- assertDatabaseRecordCounts();
-
- assertConfiguredObjects();
- assertQueueEntries();
- }
-
- public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception
- {
- corruptDatabase();
-
- UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
-
- UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
-
- upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHost());
-
- assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
- assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12);
-
- assertConfiguredObjects();
- assertQueueEntries();
- }
-
- public void testPerformXidUpgrade() throws Exception
- {
- File storeLocation = new File(TMP_FOLDER, getName());
- storeLocation.mkdirs();
- Environment environment = createEnvironment(storeLocation);
- try
- {
- populateOldXidEntries(environment);
- UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
- assertXidEntries(environment);
- }
- finally
- {
- try
- {
- environment.close();
- }
- finally
- {
- deleteDirectoryIfExists(storeLocation);
- }
-
- }
- }
-
- private void assertXidEntries(Environment environment)
- {
- final DatabaseEntry value = new DatabaseEntry();
- final DatabaseEntry key = getXidKey();
- new DatabaseTemplate(environment, NEW_XID_DB_NAME, null).run(new DatabaseRunnable()
- {
-
- @Override
- public void run(Database xidDatabase, Database nullDatabase, Transaction transaction)
- {
- xidDatabase.get(null, key, value, LockMode.DEFAULT);
- }
- });
- NewPreparedTransactionBinding newBinding = new NewPreparedTransactionBinding();
- NewPreparedTransaction newTransaction = newBinding.entryToObject(value);
- NewRecordImpl[] newEnqueues = newTransaction.getEnqueues();
- NewRecordImpl[] newDequeues = newTransaction.getDequeues();
- assertEquals("Unxpected new enqueus number", 1, newEnqueues.length);
- NewRecordImpl enqueue = newEnqueues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHost().getName()), enqueue.getId());
- assertEquals("Unxpected message id", 1, enqueue.getMessageNumber());
- assertEquals("Unxpected new dequeues number", 1, newDequeues.length);
- NewRecordImpl dequeue = newDequeues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHost().getName()), dequeue.getId());
- assertEquals("Unxpected message id", 2, dequeue.getMessageNumber());
- }
-
- private void populateOldXidEntries(Environment environment)
- {
-
- final DatabaseEntry value = new DatabaseEntry();
- OldRecordImpl[] enqueues = { new OldRecordImpl("TEST1", 1) };
- OldRecordImpl[] dequeues = { new OldRecordImpl("TEST2", 2) };
- OldPreparedTransaction oldPreparedTransaction = new OldPreparedTransaction(enqueues, dequeues);
- OldPreparedTransactionBinding oldPreparedTransactionBinding = new OldPreparedTransactionBinding();
- oldPreparedTransactionBinding.objectToEntry(oldPreparedTransaction, value);
-
- final DatabaseEntry key = getXidKey();
- new DatabaseTemplate(environment, OLD_XID_DB_NAME, null).run(new DatabaseRunnable()
- {
-
- @Override
- public void run(Database xidDatabase, Database nullDatabase, Transaction transaction)
- {
- xidDatabase.put(null, key, value);
- }
- });
- }
-
- protected DatabaseEntry getXidKey()
- {
- final DatabaseEntry value = new DatabaseEntry();
- byte[] globalId = { 1 };
- byte[] branchId = { 2 };
- Xid xid = new Xid(1l, globalId, branchId);
- XidBinding xidBinding = XidBinding.getInstance();
- xidBinding.objectToEntry(xid, value);
- return value;
- }
-
- private void assertQueueEntries()
- {
- final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- final NewQueueEntryBinding newBinding = new NewQueueEntryBinding();
- CursorOperation cursorOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- NewQueueEntryKey newEntryRecord = newBinding.entryToObject(key);
- assertTrue("Unexpected queue id", configuredObjects.containsKey(newEntryRecord.getQueueId()));
- }
- };
- new DatabaseTemplate(_environment, NEW_DELIVERY_DB_NAME, null).run(cursorOperation);
- }
-
- /**
- * modify the chunk offset of a message to be wrong, so we can test logic
- * that preserves incomplete messages
- */
- private void corruptDatabase()
- {
- CursorOperation cursorOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- CompoundKeyBinding binding = new CompoundKeyBinding();
- CompoundKey originalCompoundKey = binding.entryToObject(key);
- int corruptedOffset = originalCompoundKey.getOffset() + 2;
- CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset);
- DatabaseEntry newKey = new DatabaseEntry();
- binding.objectToEntry(corruptedCompoundKey, newKey);
-
- _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId()
- + ", changed offset from " + originalCompoundKey.getOffset() + " to "
- + corruptedCompoundKey.getOffset());
-
- deleteCurrent();
- sourceDatabase.put(transaction, newKey, value);
-
- abort();
- }
- };
-
- Transaction transaction = _environment.beginTransaction(null, null);
- new DatabaseTemplate(_environment, OLD_CONTENT_DB_NAME, transaction).run(cursorOperation);
- transaction.commit();
- }
-
- private void assertDatabaseRecordCounts()
- {
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 21);
- assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 13);
-
- assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 13);
- assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 13);
- }
-
- private void assertConfiguredObjects()
- {
- Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 21, configuredObjects.size());
-
- Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12);
- List<UUID> expectedBindingIDs = new ArrayList<UUID>();
-
- expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null));
- expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null));
- expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null));
-
- final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>();
- queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10);
- queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, "misused-owner-as-description");
- expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments));
-
- final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>();
- priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10);
- expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments));
-
- final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>();
- queueWithDLQArguments.put("x-qpid-dlq-enabled", true);
- queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2);
- expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME, Boolean.FALSE, null, queueWithDLQArguments));
-
- final Map<String, Object> dlqArguments = new HashMap<String, Object>();
- dlqArguments.put("x-qpid-dlq-enabled", false);
- dlqArguments.put("x-qpid-maximum-delivery-count", 0);
- expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME + "_DLQ", Boolean.FALSE, null, dlqArguments));
- expected.add(createExpectedExchangeMap(QUEUE_WITH_DLQ_NAME + "_DLE", "fanout"));
-
- expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "myUpgradeTopic", "amq.topic",
- Collections.singletonMap("x-filter-jms-selector", ""), expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "mySelectorUpgradeTopic", "amq.topic",
- Collections.singletonMap("x-filter-jms-selector", "testprop='true'"), expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "clientid:myDurSubName", "<<default>>", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "clientid:mySelectorDurSubName", "<<default>>", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs));
-
- expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "<<default>>", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "amq.direct", null, expectedBindingIDs));
-
- expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "<<default>>", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "amq.direct", null, expectedBindingIDs));
- expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME + "_DLQ", "dlq", QUEUE_WITH_DLQ_NAME + "_DLE", null, expectedBindingIDs));
-
- Set<String> expectedTypes = new HashSet<String>();
- expectedTypes.add(Queue.class.getName());
- expectedTypes.add(Exchange.class.getName());
- expectedTypes.add(Binding.class.getName());
- MapJsonSerializer jsonSerializer = new MapJsonSerializer();
- for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
- {
- UpgradeConfiguredObjectRecord object = entry.getValue();
- Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes());
-
- assertTrue("Unexpected entry in a store - json [" + object.getAttributes() + "], map [" + deserialized + "]",
- expected.remove(deserialized));
- String type = object.getType();
- assertTrue("Unexpected type:" + type, expectedTypes.contains(type));
- UUID key = entry.getKey();
-
- assertNotNull("Key cannot be null", key);
-
- if (type.equals(Exchange.class.getName()))
- {
- String exchangeName = (String) deserialized.get(Exchange.NAME);
- assertNotNull(exchangeName);
- assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()));
- }
- else if (type.equals(Queue.class.getName()))
- {
- String queueName = (String) deserialized.get(Queue.NAME);
- assertNotNull(queueName);
- assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()));
- }
- else if (type.equals(Binding.class.getName()))
- {
- assertTrue("unexpected binding id", expectedBindingIDs.remove(key));
- }
- }
-
- assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty());
- assertTrue("Not all expected bindings found:" + expectedBindingIDs, expectedBindingIDs.isEmpty());
- }
-
- private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs)
- {
- Map<String, Object> expectedQueueBinding = new HashMap<String, Object>();
- expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHost().getName()).toString());
- expectedQueueBinding.put(Binding.NAME, bindingName);
- expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()).toString());
- if (argumentMap != null)
- {
- expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap);
- }
-
- expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHost().getName()));
-
- return expectedQueueBinding;
- }
-
- private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap)
- {
- Map<String, Object> expectedQueueEntry = new HashMap<String, Object>();
- expectedQueueEntry.put(Queue.NAME, name);
- expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag);
- expectedQueueEntry.put(Queue.OWNER, owner);
- if (argumentMap != null)
- {
- expectedQueueEntry.put(ARGUMENTS, argumentMap);
- }
- return expectedQueueEntry;
- }
-
- private Map<String, Object> createExpectedExchangeMap(String name, String type)
- {
- Map<String, Object> expectedExchnageEntry = new HashMap<String, Object>();
- expectedExchnageEntry.put(Exchange.NAME, name);
- expectedExchnageEntry.put(Exchange.TYPE, type);
- expectedExchnageEntry.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name());
- return expectedExchnageEntry;
- }
-
- private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects()
- {
- final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>();
- final ConfiguredObjectBinding binding = new ConfiguredObjectBinding();
- final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding();
- CursorOperation configuredObjectsCursor = new CursorOperation()
- {
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- UUID id = uuidBinding.entryToObject(key);
- UpgradeConfiguredObjectRecord object = binding.entryToObject(value);
- configuredObjectsRecords.put(id, object);
- }
- };
- new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor);
- return configuredObjectsRecords;
- }
-
- private void assertContent()
- {
- final NewDataBinding contentBinding = new NewDataBinding();
- CursorOperation contentCursorOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- long id = LongBinding.entryToLong(key);
- assertTrue("Unexpected id", id > 0);
- byte[] content = contentBinding.entryToObject(value);
- assertNotNull("Unexpected content", content);
- }
- };
- new DatabaseTemplate(_environment, NEW_CONTENT_DB_NAME, null).run(contentCursorOperation);
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
deleted file mode 100644
index fc7142e9e4..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.MapJsonSerializer;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Transaction;
-
-public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
-{
- private static final String ARGUMENTS = "arguments";
-
- private static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
- private static final String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
-
- @Override
- public VirtualHost<?,?,?> getVirtualHost()
- {
- VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class);
- when(virtualHost.getName()).thenReturn("test");
- return virtualHost;
- }
-
- @Override
- protected String getStoreDirectoryName()
- {
- return "bdbstore-v7";
- }
-
- public void testPerformUpgrade() throws Exception
- {
- UpgradeFrom7To8 upgrade = new UpgradeFrom7To8();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
-
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11);
- assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13);
-
- assertConfiguredObjects();
- assertConfiguredObjectHierarchy();
- }
-
-
- private void assertConfiguredObjectHierarchy()
- {
- Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy();
- assertEquals("Unexpected number of configured objects", 13, hierarchy.size());
-
- UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName());
- UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName());
- UUID amqDirectUuid = UUIDGenerator.generateExchangeUUID("amq.direct", getVirtualHost().getName());
- UUID queue1Uuid = UUIDGenerator.generateQueueUUID("queue1", getVirtualHost().getName());
- UUID queue1ToAmqDirectBindingUuid = UUIDGenerator.generateBindingUUID("amq.direct", "queue1", "queue1", getVirtualHost().getName());
-
- // myexch -> virtualhost
- UpgradeHierarchyKey myExchToVhParent = new UpgradeHierarchyKey(myExchUuid, VirtualHost.class.getSimpleName());
- assertExpectedHierarchyEntry(hierarchy, myExchToVhParent, vhUuid);
-
- // queue1 -> virtualhost
- UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName());
- assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid);
-
- // queue1binding -> amq.direct
- // queue1binding -> queue1
- UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName());
- UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName());
- assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid);
- assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid);
-
- String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"};
- for (String exchangeName : defaultExchanges)
- {
- UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName());
- UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName());
- assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid);
- }
-
- }
-
- private void assertExpectedHierarchyEntry(
- Map<UpgradeHierarchyKey, UUID> hierarchy,
- UpgradeHierarchyKey childHierarchyKey, UUID parentUUID)
- {
- assertTrue("Expected hierarchy entry does not exist", hierarchy.containsKey(childHierarchyKey));
- assertEquals("Expected hierarchy entry does not exist", parentUUID, hierarchy.get(childHierarchyKey));
- }
-
-
- private void assertConfiguredObjects()
- {
- Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 11, configuredObjects.size());
-
- Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>();
-
- String configVersion = "0.3";
- expected.putAll(createExpectedVirtualHost(configVersion));
-
- expected.putAll(createExpectedQueue("queue1", Boolean.FALSE, null, null));
- expected.putAll(createExpectedQueue("queue2", Boolean.FALSE, null, null));
-
- expected.putAll(createExpectedExchangeMap("myexch", "direct"));
-
- expected.putAll(createExpectedBindingMap("queue1", "queue1", "amq.direct", null));
- expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null));
- expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null));
-
- expected.putAll(createExpectedExchangeMap("amq.direct", "direct"));
- expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout"));
- expected.putAll(createExpectedExchangeMap("amq.match", "headers"));
- expected.putAll(createExpectedExchangeMap("amq.topic", "topic"));
-
- MapJsonSerializer jsonSerializer = new MapJsonSerializer();
- for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
- {
- UpgradeConfiguredObjectRecord object = entry.getValue();
-
- UUID actualKey = entry.getKey();
- String actualType = object.getType();
- String actualJson = object.getAttributes();
- Map<String, Object> actualDeserializedAttributes = jsonSerializer.deserialize(actualJson);
-
- assertTrue("Entry UUID " + actualKey + " of type " + actualType + " is unexpected", expected.containsKey(actualKey));
-
- Map<String, Object> expectedDeserializedAttributes = expected.get(actualKey);
-
- assertEquals("Entry UUID " + actualKey + " of type " + actualType + " has uenxpected deserialised value, json was: " + actualJson,
- expectedDeserializedAttributes, actualDeserializedAttributes);
- }
- }
-
- private Map<UUID, Map<String, Object>> createExpectedVirtualHost(String modelVersion)
- {
- Map<String, Object> expectedVirtualHostEntry = new HashMap<String, Object>();
- expectedVirtualHostEntry.put("modelVersion", modelVersion);
- expectedVirtualHostEntry.put(VirtualHost.NAME, getVirtualHost().getName());
-
- UUID expectedUUID = UUIDGenerator.generateVhostUUID(getVirtualHost().getName());
- return Collections.singletonMap(expectedUUID, expectedVirtualHostEntry);
- }
-
- private Map<UUID, Map<String, Object>> createExpectedQueue(String queueName, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap)
- {
- Map<String, Object> expectedQueueEntry = new HashMap<String, Object>();
- expectedQueueEntry.put(Queue.NAME, queueName);
- expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag);
- expectedQueueEntry.put(Queue.OWNER, owner);
- expectedQueueEntry.put(Queue.TYPE, "standard");
-
- if (argumentMap != null)
- {
- expectedQueueEntry.put(ARGUMENTS, argumentMap);
- }
- UUID expectedUUID = UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName());
- return Collections.singletonMap(expectedUUID, expectedQueueEntry);
- }
-
- private Map<UUID, Map<String, Object>> createExpectedExchangeMap(String exchangeName, String type)
- {
- Map<String, Object> expectedExchangeMap = new HashMap<String, Object>();
- expectedExchangeMap.put(Exchange.NAME, exchangeName);
- expectedExchangeMap.put(Exchange.TYPE, type);
- expectedExchangeMap.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name());
- UUID expectedUUID = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName());
- return Collections.singletonMap(expectedUUID, expectedExchangeMap);
- }
-
- private Map<UUID, Map<String, Object>> createExpectedBindingMap(String queueName, String bindingName, String exchangeName, Map<String, String> argumentMap)
- {
- Map<String, Object> expectedBinding = new HashMap<String, Object>();
- expectedBinding.put(Binding.NAME, bindingName);
- expectedBinding.put(Binding.ARGUMENTS, argumentMap == null ? Collections.emptyMap() : argumentMap);
-
- UUID expectedUUID = UUIDGenerator.generateBindingUUID(exchangeName, queueName, bindingName, getVirtualHost().getName());
- return Collections.singletonMap(expectedUUID, expectedBinding);
- }
-
- private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects()
- {
- final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>();
- final UpgradeConfiguredObjectBinding binding = new UpgradeConfiguredObjectBinding();
- final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding();
- CursorOperation configuredObjectsCursor = new CursorOperation()
- {
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- UUID id = uuidBinding.entryToObject(key);
- UpgradeConfiguredObjectRecord object = binding.entryToObject(value);
- configuredObjectsRecords.put(id, object);
- }
- };
- new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor);
- return configuredObjectsRecords;
- }
-
-
- private Map<UpgradeHierarchyKey, UUID> loadConfiguredObjectHierarchy()
- {
- final Map<UpgradeHierarchyKey, UUID> hierarchyRecords = new HashMap<UpgradeHierarchyKey, UUID>();
- final UpgradeHierarchyKeyBinding hierarchyKeyBinding = new UpgradeHierarchyKeyBinding();
- final UpgradeUUIDBinding uuidParentBinding = new UpgradeUUIDBinding();
- CursorOperation hierarchyCursor = new CursorOperation()
- {
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
- DatabaseEntry key, DatabaseEntry value)
- {
- UpgradeHierarchyKey hierarchyKey = hierarchyKeyBinding.entryToObject(key);
- UUID parentId = uuidParentBinding.entryToObject(value);
- hierarchyRecords.put(hierarchyKey, parentId);
- }
- };
- new DatabaseTemplate(_environment, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, null).run(hierarchyCursor);
- return hierarchyRecords;
- }
-
- private static class UpgradeConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord>
- {
- @Override
- public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput)
- {
- String type = tupleInput.readString();
- String json = tupleInput.readString();
- UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json);
- return configuredObject;
- }
-
- @Override
- public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput)
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class UpgradeConfiguredObjectRecord
- {
- private final String _attributes;
- private final String _type;
-
- public UpgradeConfiguredObjectRecord(String type, String attributes)
- {
- super();
- _attributes = attributes;
- _type = type;
- }
-
- public String getAttributes()
- {
- return _attributes;
- }
-
- public String getType()
- {
- return _type;
- }
-
- }
-
- private static class UpgradeUUIDBinding extends TupleBinding<UUID>
- {
- @Override
- public UUID entryToObject(final TupleInput tupleInput)
- {
- return new UUID(tupleInput.readLong(), tupleInput.readLong());
- }
-
- @Override
- public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class UpgradeHierarchyKeyBinding extends TupleBinding<UpgradeHierarchyKey>
- {
- @Override
- public UpgradeHierarchyKey entryToObject(TupleInput tupleInput)
- {
- UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong());
- String parentType = tupleInput.readString();
-
- return new UpgradeHierarchyKey(childId, parentType);
- }
-
- @Override
- public void objectToEntry(UpgradeHierarchyKey hk, TupleOutput tupleOutput)
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class UpgradeHierarchyKey
- {
- private final UUID _childId;
- private final String _parentType;
-
- public UpgradeHierarchyKey(final UUID childId, final String parentType)
- {
- _childId = childId;
- _parentType = parentType;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final UpgradeHierarchyKey that = (UpgradeHierarchyKey) o;
-
- if (!_childId.equals(that._childId))
- {
- return false;
- }
- if (!_parentType.equals(that._parentType))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _childId.hashCode();
- result = 31 * result + _parentType.hashCode();
- return result;
- }
-
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
deleted file mode 100644
index 5b869b67dc..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.OperationStatus;
-
-import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-
-public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
-{
- private Upgrader _upgrader;
-
- @Override
- protected String getStoreDirectoryName()
- {
- return "bdbstore-v999";
- }
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _upgrader = new Upgrader(_environment, getVirtualHost());
- }
-
- private int getStoreVersion()
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- int storeVersion = -1;
- Database versionDb = null;
- Cursor cursor = null;
- try
- {
- versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
- cursor = versionDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS)
- {
- int version = IntegerBinding.entryToInt(key);
- if (storeVersion < version)
- {
- storeVersion = version;
- }
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- if (versionDb != null)
- {
- versionDb.close();
- }
- }
- return storeVersion;
- }
-
- public void testUpgrade() throws Exception
- {
- assertEquals("Unexpected store version", 999, getStoreVersion());
- try
- {
- _upgrader.upgradeIfNecessary();
- fail("Store should not be able to be upgraded");
- }
- catch(ServerScopedRuntimeException ex)
- {
- assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + BDBConfigurationStore.VERSION, ex.getMessage());
- }
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
deleted file mode 100644
index 54dd08fd98..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-
-public class UpgraderTest extends AbstractUpgradeTestCase
-{
- private Upgrader _upgrader;
-
- @Override
- protected String getStoreDirectoryName()
- {
- return "bdbstore-v4";
- }
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _upgrader = new Upgrader(_environment, getVirtualHost());
- }
-
- private int getStoreVersion(Environment environment)
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- int storeVersion = -1;
- Database versionDb = null;
- Cursor cursor = null;
- try
- {
- versionDb = environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
- cursor = versionDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS)
- {
- int version = IntegerBinding.entryToInt(key);
- if (storeVersion < version)
- {
- storeVersion = version;
- }
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- if (versionDb != null)
- {
- versionDb.close();
- }
- }
- return storeVersion;
- }
-
- public void testUpgrade() throws Exception
- {
- assertEquals("Unexpected store version", -1, getStoreVersion(_environment));
- _upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(_environment));
- assertContent();
- }
-
- public void testEmptyDatabaseUpgradeDoesNothing() throws Exception
- {
- File nonExistentStoreLocation = new File(TMP_FOLDER, getName());
- deleteDirectoryIfExists(nonExistentStoreLocation);
-
- nonExistentStoreLocation.mkdir();
- Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation);
- try
- {
- _upgrader = new Upgrader(emptyEnvironment, getVirtualHost());
- _upgrader.upgradeIfNecessary();
-
- List<String> databaseNames = emptyEnvironment.getDatabaseNames();
- List<String> expectedDatabases = new ArrayList<String>();
- expectedDatabases.add(Upgrader.VERSION_DB_NAME);
- assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(emptyEnvironment));
-
- }
- finally
- {
- emptyEnvironment.close();
- nonExistentStoreLocation.delete();
- }
- }
-
- private void assertContent()
- {
- final ContentBinding contentBinding = ContentBinding.getInstance();
- CursorOperation contentCursorOperation = new CursorOperation()
- {
-
- @Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
- DatabaseEntry value)
- {
- long id = LongBinding.entryToLong(key);
- assertTrue("Unexpected id", id > 0);
- byte[] content = contentBinding.entryToObject(value);
- assertNotNull("Unexpected content", content);
- }
- };
- new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java
deleted file mode 100644
index 24a2ddb071..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImplTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.berkeleydb;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
-import org.apache.qpid.util.FileUtils;
-
-public class BDBVirtualHostImplTest extends QpidTestCase
-{
- private File _storePath;
- private VirtualHostNode<?> _node;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- Broker broker = BrokerTestHelper.createBrokerMock();
-
- TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
- when(broker.getTaskExecutor()).thenReturn(taskExecutor);
- when(broker.getChildExecutor()).thenReturn(taskExecutor);
-
-
- _storePath = TestFileUtils.createTestDirectory();
-
- _node = mock(VirtualHostNode.class);
- when(_node.getParent(Broker.class)).thenReturn(broker);
- when(_node.getModel()).thenReturn(BrokerModel.getInstance());
- when(_node.getTaskExecutor()).thenReturn(taskExecutor);
- when(_node.getChildExecutor()).thenReturn(taskExecutor);
- when(_node.getConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class));
- when(_node.getId()).thenReturn(UUID.randomUUID());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_storePath != null)
- {
- FileUtils.delete(_storePath, true);
- }
- }
- finally
- {
- super.tearDown();
- }
- }
-
- public void testValidateOnCreateForInvalidStorePath() throws Exception
- {
- String hostName = getTestName();
- File file = new File(_storePath + File.separator + hostName);
- assertTrue("Empty file is not created", file.createNewFile());
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(BDBVirtualHost.ID, UUID.randomUUID());
- attributes.put(BDBVirtualHost.TYPE, BDBVirtualHostImpl.VIRTUAL_HOST_TYPE);
- attributes.put(BDBVirtualHost.NAME, hostName);
- attributes.put(BDBVirtualHost.STORE_PATH, file.getAbsoluteFile());
-
- BDBVirtualHostImpl host = new BDBVirtualHostImpl(attributes, _node);
- try
- {
- host.create();
- fail("Cannot create DBD virtual host from existing empty file");
- }
- catch (IllegalConfigurationException e)
- {
- assertTrue("Unexpected exception " + e.getMessage(), e.getMessage().startsWith("Cannot open virtual host message store"));
- }
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java
deleted file mode 100644
index 4fe2bdc97f..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.virtualhostnode.berkeleydb;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.security.AccessControlException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class BDBHARemoteReplicationNodeTest extends QpidTestCase
-{
- private final org.apache.qpid.server.security.SecurityManager _mockSecurityManager = mock(SecurityManager.class);
-
- private Broker _broker;
- private TaskExecutor _taskExecutor;
- private BDBHAVirtualHostNode<?> _virtualHostNode;
- private DurableConfigurationStore _configStore;
- private ReplicatedEnvironmentFacade _facade;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _facade = mock(ReplicatedEnvironmentFacade.class);
-
- _broker = BrokerTestHelper.createBrokerMock();
-
- _taskExecutor = new CurrentThreadTaskExecutor();
- _taskExecutor.start();
- when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
- when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
-
- _virtualHostNode = mock(BDBHAVirtualHostNode.class);
- _configStore = mock(DurableConfigurationStore.class);
- when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore);
-
- // Virtualhost needs the EventLogger from the SystemContext.
- when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
- doReturn(VirtualHostNode.class).when(_virtualHostNode).getCategoryClass();
- ConfiguredObjectFactory objectFactory = _broker.getObjectFactory();
- when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
- when(_virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor);
- when(_virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor);
-
- }
-
- public void testUpdateRole()
- {
- String remoteReplicationName = getName();
- BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName);
-
- remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, remoteReplicationNode.getRole(), NodeRole.MASTER);
-
- verify(_facade).transferMasterAsynchronously(remoteReplicationName);
- }
-
- public void testDelete()
- {
- String remoteReplicationName = getName();
- BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName);
-
- remoteReplicationNode.delete();
-
- verify(_facade).removeNodeFromGroup(remoteReplicationName);
- }
-
- // *************** ReplicationNode Access Control Tests ***************
-
- public void testUpdateDeniedByACL()
- {
- when(_broker.getSecurityManager()).thenReturn(_mockSecurityManager);
-
- String remoteReplicationName = getName();
- BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName);
-
- doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(remoteReplicationNode);
-
- assertNull(remoteReplicationNode.getDescription());
-
- try
- {
- remoteReplicationNode.setAttribute(VirtualHost.DESCRIPTION, null, "My description");
- fail("Exception not thrown");
- }
- catch (AccessControlException ace)
- {
- // PASS
- }
- }
-
- public void testDeleteDeniedByACL()
- {
- when(_broker.getSecurityManager()).thenReturn(_mockSecurityManager);
-
- String remoteReplicationName = getName();
- BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName);
-
- doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(remoteReplicationNode);
-
- assertNull(remoteReplicationNode.getDescription());
-
- try
- {
- remoteReplicationNode.delete();
- fail("Exception not thrown");
- }
- catch (AccessControlException ace)
- {
- // PASS
- }
- }
-
- private BDBHARemoteReplicationNode createRemoteReplicationNode(final String replicationNodeName)
- {
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(BDBHARemoteReplicationNode.NAME, replicationNodeName);
- attributes.put(BDBHARemoteReplicationNode.MONITOR, Boolean.FALSE);
-
- BDBHARemoteReplicationNodeImpl node = new BDBHARemoteReplicationNodeImpl(_virtualHostNode, attributes, _facade);
- node.create();
- return node;
- }
-
-
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java
deleted file mode 100644
index 6d740568ab..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhostnode.berkeleydb;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
-
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.SystemConfig;
-import org.apache.qpid.server.store.berkeleydb.NoopConfigurationChangeListener;
-import org.apache.qpid.test.utils.PortHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
-/**
- * Class to test that specific VHN operations result in the expected Operational Log message(s) being performed.
- */
-public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
-{
- private BDBHAVirtualHostNodeTestHelper _helper;
- private EventLogger _eventLogger;
- private PortHelper _portHelper = new PortHelper();
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _helper = new BDBHAVirtualHostNodeTestHelper(getTestName());
- _eventLogger = mock(EventLogger.class);
- SystemConfig<?> context = (SystemConfig<?>) _helper.getBroker().getParent(SystemConfig.class);
- when(context.getEventLogger()).thenReturn(_eventLogger);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- _helper.tearDown();
- }
- finally
- {
- super.tearDown();
- }
-
- _portHelper.waitUntilAllocatedPortsAreFree();
- }
-
- public void testCreate() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
-
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- // stop node to avoid running into race when role change is reported after we performed the check
- node1.stop();
-
- assertEquals("Unexpected VHN log subject", "[grp(/group)/vhn(/node1)] ", node1.getVirtualHostNodeLogSubject().getLogString());
- assertEquals("Unexpected group log subject", "[grp(/group)] ", node1.getGroupLogSubject().getLogString());
-
- String expectedMessage = HighAvailabilityMessages.CREATED().toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.CREATED_LOG_HIERARCHY)));
-
- expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getAddress(), NodeRole.WAITING.name(), NodeRole.MASTER.name()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ROLE_CHANGED_LOG_HIERARCHY)));
- }
-
- public void testDelete() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- reset(_eventLogger);
-
- node1.delete();
-
- String expectedMessage = HighAvailabilityMessages.DELETED().toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY)));
-
- }
-
- public void testSetPriority() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- reset(_eventLogger);
-
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 10));
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED("10").toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.PRIORITY_CHANGED_LOG_HIERARCHY)));
- }
-
- public void testSetQuorumOverride() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- reset(_eventLogger);
-
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED("1").toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY)));
- }
-
- public void testSetDesignatedPrimary() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- reset(_eventLogger);
-
- node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED("true").toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY)));
- }
-
- public void testRemoteNodeAdded() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- reset(_eventLogger);
-
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes);
- _helper.awaitRemoteNodes(node1, 1);
-
- // make sure that task executor thread finishes all scheduled tasks
- node2.stop();
-
- // Verify ADDED message from node1 when it discovers node2 has been added
- String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), node2.getAddress()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY)));
- }
-
- public void testRemoteNodeRemoved() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- resetEventLogger();
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes);
- _helper.awaitRemoteNodes(node1, 1);
-
- reset(_eventLogger);
-
- node2.delete();
- _helper.awaitRemoteNodes(node1, 0);
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- String expectedMessage = HighAvailabilityMessages.REMOVED(node2.getName(), node2.getAddress()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.REMOVED_LOG_HIERARCHY)));
- }
-
- public void testRemoteNodeDetached() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
-
- final CountDownLatch remoteNodeAdded = new CountDownLatch(1);
- node1.addChangeListener(new NoopConfigurationChangeListener()
- {
- @Override
- public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- if (child instanceof BDBHARemoteReplicationNode)
- {
- remoteNodeAdded.countDown();
- }
- }
- });
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes);
-
- assertTrue("Remote node was not added during expected period of time", remoteNodeAdded.await(10, TimeUnit.SECONDS));
-
-
- BDBHARemoteReplicationNodeImpl remoteNode = (BDBHARemoteReplicationNodeImpl)node1.getRemoteReplicationNodes().iterator().next();
- waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA));
-
-
- reset(_eventLogger);
-
- // close remote node
- node2.close();
-
-
- waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.UNREACHABLE));
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- // verify that remaining node issues the DETACHED operational logging for remote node
- String expectedMessage = HighAvailabilityMessages.LEFT(node2.getName(), node2.getAddress()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.LEFT_LOG_HIERARCHY)));
- }
-
-
- public void testRemoteNodeReAttached() throws Exception
- {
- int node1PortNumber = _portHelper.getNextAvailable();
- int node2PortNumber = _portHelper.getNextAvailable();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
- node1Attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, NodeRole.MASTER);
-
- resetEventLogger();
-
- Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
- BDBHAVirtualHostNodeImpl node2 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node2Attributes);
- _helper.awaitRemoteNodes(node1, 1);
-
- BDBHARemoteReplicationNodeImpl remoteNode = (BDBHARemoteReplicationNodeImpl)node1.getRemoteReplicationNodes().iterator().next();
- waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA));
-
- node2.close();
-
- waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.UNREACHABLE));
-
- reset(_eventLogger);
-
- node2Attributes.put(BDBHAVirtualHostNode.PERMITTED_NODES,
- node1Attributes.get(BDBHAVirtualHostNode.PERMITTED_NODES));
- node2 = (BDBHAVirtualHostNodeImpl)_helper.recoverHaVHN(node2.getId(), node2Attributes);
- _helper.assertNodeRole(node2, NodeRole.REPLICA, NodeRole.MASTER);
- waitForRemoteNodeToAttainRole(remoteNode, EnumSet.of(NodeRole.REPLICA, NodeRole.MASTER));
-
- // make sure that task executor thread finishes all scheduled tasks
- node1.stop();
-
- final String expectedMessage = HighAvailabilityMessages.JOINED(node2.getName(), node2.getAddress()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.JOINED_LOG_HIERARCHY)));
- }
-
- private void waitForRemoteNodeToAttainRole(BDBHARemoteReplicationNode remoteNode, EnumSet<NodeRole> desiredRoles) throws Exception
- {
- int counter = 0;
- while (!desiredRoles.contains(remoteNode.getRole()) && counter<50)
- {
- Thread.sleep(100);
- counter++;
- }
- }
-
- private EventLogger resetEventLogger()
- {
- EventLogger eventLogger = mock(EventLogger.class);
- SystemConfig<?> context = (SystemConfig<?>) _helper.getBroker().getParent(SystemConfig.class);
- when(context.getEventLogger()).thenReturn(eventLogger);
- return eventLogger;
- }
-
- class LogMessageMatcher extends ArgumentMatcher<LogMessage>
- {
- private String _expectedMessage;
- private String _expectedMessageFailureDescription = null;
- private String _expectedHierarchy;
- private String _expectedHierarchyFailureDescription = null;
-
- public LogMessageMatcher(String expectedMessage, String expectedHierarchy)
- {
- _expectedMessage = expectedMessage;
- _expectedHierarchy = expectedHierarchy;
- }
-
- @Override
- public boolean matches(Object argument)
- {
- LogMessage logMessage = (LogMessage)argument;
-
- boolean expectedMessageMatches = _expectedMessage.equals(logMessage.toString());
- if (!expectedMessageMatches)
- {
- _expectedMessageFailureDescription = "Expected message does not match. Expected: " + _expectedMessage + ", actual: " + logMessage.toString();
- }
- boolean expectedHierarchyMatches = _expectedHierarchy.equals(logMessage.getLogHierarchy());
- if (!expectedHierarchyMatches)
- {
- _expectedHierarchyFailureDescription = "Expected hierarchy does not match. Expected: " + _expectedHierarchy + ", actual: " + logMessage.getLogHierarchy();
- }
-
- return expectedMessageMatches && expectedHierarchyMatches;
- }
-
- @Override
- public void describeTo(Description description)
- {
- if (_expectedMessageFailureDescription != null)
- {
- description.appendText(_expectedMessageFailureDescription);
- }
- if (_expectedHierarchyFailureDescription != null)
- {
- description.appendText(_expectedHierarchyFailureDescription);
- }
- }
- }
-
- class LogSubjectMatcher extends ArgumentMatcher<LogSubject>
- {
- private LogSubject _logSubject;
- private String _failureDescription = null;
-
- public LogSubjectMatcher(LogSubject logSubject)
- {
- _logSubject = logSubject;
- }
-
- @Override
- public boolean matches(Object argument)
- {
- final LogSubject logSubject = (LogSubject)argument;
- final boolean foundAMatch = _logSubject.toLogString().equals(logSubject.toLogString());
- if (!foundAMatch)
- {
- _failureDescription = "LogSubject does not match. Expected: " + _logSubject.toLogString() + ", actual : " + logSubject.toLogString();
- }
- return foundAMatch;
- }
-
- @Override
- public void describeTo(Description description)
- {
- if (_failureDescription != null)
- {
- description.appendText(_failureDescription);
- }
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java
deleted file mode 100644
index 9d0e905fc5..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhostnode.berkeleydb;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import com.sleepycat.je.rep.ReplicationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.RemoteReplicationNode;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
-import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-/**
- * Helper class to make the tests of BDB HA Virtual Host Nodes simpler and more concise.
- */
-public class BDBHAVirtualHostNodeTestHelper
-{
- private final String _testName;
- private Broker<?> _broker;
- private File _bdbStorePath;
- private TaskExecutor _taskExecutor;
- private final ConfiguredObjectFactory _objectFactory = BrokerModel.getInstance().getObjectFactory();
- private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>();
-
- public BDBHAVirtualHostNodeTestHelper(String testName) throws Exception
- {
- _testName = testName;
- _broker = BrokerTestHelper.createBrokerMock();
-
- _taskExecutor = new TaskExecutorImpl();
- _taskExecutor.start();
- when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
- when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
-
-
- _bdbStorePath = new File(QpidTestCase.TMP_FOLDER, _testName + "." + System.currentTimeMillis());
- _bdbStorePath.deleteOnExit();
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- Exception firstException = null;
- for (VirtualHostNode<?> node : _nodes)
- {
- try
- {
- node.delete();
- }
- catch(Exception e)
- {
- if (firstException != null)
- {
- firstException = e;
- }
- }
- }
- if (firstException != null)
- {
- throw firstException;
- }
- }
- finally
- {
- if (_taskExecutor != null)
- {
- _taskExecutor.stopImmediately();
- }
- if (_bdbStorePath != null)
- {
- FileUtils.delete(_bdbStorePath, true);
- }
- }
- }
-
- public BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name)
- {
- for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes())
- {
- if (remoteNode.getName().equals(name))
- {
- return (BDBHARemoteReplicationNode<?>)remoteNode;
- }
- }
- return null;
- }
-
- public void awaitRemoteNodes(BDBHAVirtualHostNode<?> node, int expectedNodeNumber) throws InterruptedException
- {
- int counter = 0;
-
- @SuppressWarnings("rawtypes")
- Collection<? extends RemoteReplicationNode> remoteNodes = null;
- do
- {
- remoteNodes = node.getRemoteReplicationNodes();
- if (counter > 0)
- {
- Thread.sleep(100);
- }
- counter++;
- }
- // TODO: 30 seconds is quite a lot to wait, we need to reduce this limit
- while(remoteNodes.size() != expectedNodeNumber && counter<100);
- assertEquals("Unexpected node number", expectedNodeNumber, node.getRemoteReplicationNodes().size());
- }
-
- public void awaitForAttributeChange(ConfiguredObject<?> object, String name, Object expectedValue) throws InterruptedException
- {
- int awaitCounter = 0;
- while(!object.equals(object.getAttribute(name)) && awaitCounter < 50)
- {
- Thread.sleep(100);
- awaitCounter++;
- }
- assertEquals("Unexpected attribute " + name + " on " + object, expectedValue, object.getAttribute(name) );
- }
-
- public BDBHAVirtualHostNode<?> awaitAndFindNodeInRole(NodeRole desiredRole) throws InterruptedException
- {
- BDBHAVirtualHostNode<?> replica = null;
- int findReplicaCount = 0;
- while(replica == null)
- {
- replica = findNodeInRole(desiredRole);
- if (replica == null)
- {
- Thread.sleep(100);
- }
- if (findReplicaCount > 50)
- {
- fail("Could not find a node in role " + desiredRole);
- }
- findReplicaCount++;
- }
- return replica;
- }
-
- public BDBHAVirtualHostNode<?> findNodeInRole(NodeRole role)
- {
- for (BDBHAVirtualHostNode<?> node : _nodes)
- {
- if (role == node.getRole())
- {
- return node;
- }
- }
- return null;
- }
-
- public BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes)
- {
- @SuppressWarnings("unchecked")
- BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) _objectFactory.create(VirtualHostNode.class, attributes, _broker);
- _nodes.add(node);
- return node;
- }
-
- public BDBHAVirtualHostNode<?> recoverHaVHN(UUID id, Map<String, Object> attributes)
- {
- Map<String,UUID> parents = new HashMap<>();
- parents.put(Broker.class.getSimpleName(),_broker.getId());
- ConfiguredObjectRecordImpl record = new ConfiguredObjectRecordImpl(id, VirtualHostNode.class.getSimpleName(), attributes, parents );
-
- @SuppressWarnings("unchecked")
- UnresolvedConfiguredObject<BDBHAVirtualHostNodeImpl> unresolved = _objectFactory.recover(record, _broker);
- BDBHAVirtualHostNode<?> node = unresolved.resolve();
- node.open();
- _nodes.add(node);
- return node;
- }
-
- public void assertNodeRole(BDBHAVirtualHostNode<?> node, NodeRole... roleName) throws InterruptedException
- {
- int iterationCounter = 0;
- boolean inRole =false;
- do
- {
- for (NodeRole role : roleName)
- {
- if (role == node.getRole())
- {
- inRole = true;
- break;
- }
- }
- if (!inRole)
- {
- Thread.sleep(50);
- }
- iterationCounter++;
- }
- while(!inRole && iterationCounter<100);
- assertTrue("Node " + node.getName() + " did not transit into role " + Arrays.toString(roleName)
- + " Node role is " + node.getRole(), inRole);
- }
-
- public BDBHAVirtualHostNode<?> createAndStartHaVHN(Map<String, Object> attributes) throws InterruptedException
- {
- BDBHAVirtualHostNode<?> node = createHaVHN(attributes);
- return startNodeAndWait(node);
- }
-
- public BDBHAVirtualHostNode<?> startNodeAndWait(BDBHAVirtualHostNode<?> node) throws InterruptedException
- {
- node.start();
- assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
- assertEquals("Unexpected node state", State.ACTIVE, node.getState());
- return node;
- }
-
- public String getMessageStorePath()
- {
- return _bdbStorePath.getAbsolutePath();
- }
-
- public Broker getBroker()
- {
- return _broker;
- }
-
- public Map<String, Object> createNodeAttributes(String nodeName, String groupName, String address,
- String helperAddress, String helperNodeNode, int... ports)
- throws Exception
- {
- Map<String, Object> node1Attributes = new HashMap<String, Object>();
- node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
- node1Attributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
- node1Attributes.put(BDBHAVirtualHostNode.NAME, nodeName);
- node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
- node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, address);
- node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
- node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, getMessageStorePath() + File.separator + nodeName);
- if (address.equals(helperAddress))
- {
- node1Attributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, getPermittedNodes(ports));
- }
- else
- {
- node1Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperNodeNode);
- }
-
- Map<String, String> context = new HashMap<String, String>();
- context.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
- context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
-
- if (ports != null)
- {
- String bluePrint = getBlueprint();
- node1Attributes.put(AbstractVirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, bluePrint);
- }
-
- node1Attributes.put(BDBHAVirtualHostNode.CONTEXT, context);
-
- return node1Attributes;
- }
-
- public static String getBlueprint() throws Exception
- {
- Map<String,Object> bluePrint = new HashMap<>();
- bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
-
- StringWriter writer = new StringWriter();
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
- mapper.writeValue(writer, bluePrint);
- return writer.toString();
- }
-
- public static List<String> getPermittedNodes(int[] ports)
- {
- List<String> permittedNodes = new ArrayList<String>();
- for (int port:ports)
- {
- permittedNodes.add("localhost:" + port);
- }
- return permittedNodes;
- }
-
- public void awaitForVirtualhost(final VirtualHostNode<?> node, final int wait)
- {
- long endTime = System.currentTimeMillis() + wait;
- do
- {
- if(node.getVirtualHost() != null)
- {
- return;
- }
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- }
- while(System.currentTimeMillis() < endTime);
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java
deleted file mode 100644
index 812d9a3b19..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhostnode.berkeleydb;
-
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
-import org.apache.qpid.util.FileUtils;
-
-public class BDBVirtualHostNodeTest extends QpidTestCase
-{
- private Broker<?> _broker;
- private File _storePath;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _broker = BrokerTestHelper.createBrokerMock();
- TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
- when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
- when(_broker.getChildExecutor()).thenReturn(taskExecutor);
-
- _storePath = TestFileUtils.createTestDirectory();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_storePath != null)
- {
- FileUtils.delete(_storePath, true);
- }
- }
- finally
- {
- super.tearDown();
- }
- }
-
- public void testValidateOnCreateForInvalidStorePath() throws Exception
- {
- String nodeName = getTestName();
- File file = new File(_storePath + File.separator + nodeName);
- assertTrue("Empty file is not created", file.createNewFile());
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(BDBVirtualHostNode.ID, UUID.randomUUID());
- attributes.put(BDBVirtualHostNode.TYPE, BDBVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
- attributes.put(BDBVirtualHostNode.NAME, nodeName);
- attributes.put(BDBVirtualHostNode.STORE_PATH, file.getAbsolutePath());
-
- BDBVirtualHostNodeImpl node = new BDBVirtualHostNodeImpl(attributes, _broker);
- try
- {
- node.create();
- fail("Cannot create DBD node from existing empty file");
- }
- catch (IllegalConfigurationException e)
- {
- assertTrue("Unexpected exception " + e.getMessage(), e.getMessage().startsWith("Cannot open node configuration store"));
- }
- }
-
-}
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
deleted file mode 100644
index cfc1f05d28..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
+++ /dev/null
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt
deleted file mode 100644
index a7e754f967..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-The bdbstore v5 data were obtained by upgrading the bdbstore v4 data as part of running
-test UpgradeFrom4to5Test#testPerformUpgradeWithHandlerAnsweringNo.
-
-The rationale for not using BDBStoreUpgradeTestPreparer in this case is that we need chunked content.
-Current implementation of BDBMessageStore only stores messages in one chunk. \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
deleted file mode 100644
index cfc1f05d28..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
+++ /dev/null
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
deleted file mode 100644
index 4b45ff61e6..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
+++ /dev/null
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt
deleted file mode 100644
index efb929c944..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-The bdbstore v7 data was obtained by running 0.26 and:
-
-* creating an exchange 'myexch' of type direct
-* creating queues 'queue1' and 'queue2'
-* binding 'queue1' to 'myexch' and 'amq.direct' using binding key 'queue1'
-* binding 'queue2' to amq.fanout only using binding key 'queue2' \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb
deleted file mode 100644
index 4957f86e1a..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb
+++ /dev/null
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb
deleted file mode 100644
index 991367019f..0000000000
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb
+++ /dev/null
Binary files differ