From 07c285f662e8f60d4e8aca247b65b77ca5df4587 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 28 Jun 2012 16:46:12 +0000 Subject: QPID-3998, QPID-3999, QPID-4093: add new management plugins for jmx/rest/webui functionality, partial merge from the java-config-and-management branch at r1355039 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355072 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/bdbstore/build.xml | 2 +- qpid/java/bdbstore/jmx/MANIFEST.MF | 20 ++ qpid/java/bdbstore/jmx/build.xml | 29 +++ .../jmx/BDBHAMessageStoreManagerMBean.java | 232 ++++++++++++++++++++ .../jmx/BDBHAMessageStoreManagerMBeanProvider.java | 73 +++++++ .../berkeleydb/jmx/ManagedBDBHAMessageStore.java | 82 ++++++++ .../org.apache.qpid.server.jmx.MBeanProvider | 1 + .../store/berkeleydb/HAClusterManagementTest.java | 231 ++++++++++++++++++++ .../store/berkeleydb/HAClusterTwoNodeTest.java | 217 +++++++++++++++++++ .../jmx/BDBHAMessageStoreManagerMBeanTest.java | 233 +++++++++++++++++++++ .../server/store/berkeleydb/BDBHAMessageStore.java | 32 ++- .../berkeleydb/BDBHAMessageStoreManagerMBean.java | 214 ------------------- .../server/store/berkeleydb/BDBMessageStore.java | 8 + .../store/berkeleydb/ManagedBDBHAMessageStore.java | 82 -------- .../store/berkeleydb/upgrade/UpgradeFrom5To6.java | 45 +++- .../BDBHAMessageStoreManagerMBeanTest.java | 225 -------------------- .../BDBMessageStoreConfigurationTest.java | 20 ++ .../store/berkeleydb/BDBMessageStoreTest.java | 2 +- .../store/berkeleydb/HAClusterManagementTest.java | 230 -------------------- .../store/berkeleydb/HAClusterTwoNodeTest.java | 216 ------------------- .../upgrade/AbstractUpgradeTestCase.java | 4 +- .../berkeleydb/upgrade/UpgradeFrom4to5Test.java | 44 +++- .../berkeleydb/upgrade/UpgradeFrom5To6Test.java | 109 +++++----- .../upgrade/bdbstore-v4/test-store/00000000.jdb | Bin 1357197 -> 1361990 bytes .../upgrade/bdbstore-v5/test-store/00000000.jdb | Bin 1357227 -> 1361990 bytes .../upgrade/bdbstore-v5/test-store/00000001.jdb | Bin 1332881 -> 1333643 bytes 26 files changed, 1294 insertions(+), 1057 deletions(-) create mode 100644 qpid/java/bdbstore/jmx/MANIFEST.MF create mode 100644 qpid/java/bdbstore/jmx/build.xml create mode 100644 qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java create mode 100644 qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java create mode 100644 qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java create mode 100644 qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider create mode 100644 qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java create mode 100644 qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java create mode 100644 qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java (limited to 'qpid/java/bdbstore') diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml index 7df048c691..7c305c7c2f 100644 --- a/qpid/java/bdbstore/build.xml +++ b/qpid/java/bdbstore/build.xml @@ -17,7 +17,7 @@ - under the License. --> - + diff --git a/qpid/java/bdbstore/jmx/MANIFEST.MF b/qpid/java/bdbstore/jmx/MANIFEST.MF new file mode 100644 index 0000000000..7046c4326d --- /dev/null +++ b/qpid/java/bdbstore/jmx/MANIFEST.MF @@ -0,0 +1,20 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Qpid Bdbstore-Plugins JMX +Bundle-SymbolicName: bdbstore-plugins-jmx +Bundle-Description: Bdbstore Management plugin for Qpid. +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt +Bundle-DocURL: http://www.apache.org/ +Bundle-Version: 1.0.0 +Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-ClassPath: . +Fragment-Host: broker-plugins-jmx +Import-Package: org.apache.qpid, + org.apache.qpid.management.common.mbeans.annotations, + org.apache.qpid.server.model, + org.apache.qpid.server.virtualhost, + org.apache.qpid.server.store.berkeleydb, + org.apache.log4j;version=1.2.16, + javax.management, + javax.management.openmbean +Export-Package: org.apache.qpid.server.store.berkeleydb.jmx diff --git a/qpid/java/bdbstore/jmx/build.xml b/qpid/java/bdbstore/jmx/build.xml new file mode 100644 index 0000000000..2015b0cbb5 --- /dev/null +++ b/qpid/java/bdbstore/jmx/build.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java new file mode 100644 index 0000000000..455573f7bc --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; + +/** + * Management mbean for BDB HA. + *

+ * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is + * currently arranged through OSGI using the fragment feature so that this bundle shares the + * same classloader as broker-plugins-jmx. See the Fragment-Host: header within the MANIFEST.MF + * of this bundle. + *

+ */ +public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); + + private static final TabularType GROUP_MEMBERS_TABLE; + private static final CompositeType GROUP_MEMBER_ROW; + private static final OpenType[] GROUP_MEMBER_ATTRIBUTE_TYPES; + + static + { + try + { + GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType[] {SimpleType.STRING, SimpleType.STRING}; + final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; + GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", + itemNames, + itemDescriptions, + GROUP_MEMBER_ATTRIBUTE_TYPES ); + GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", + GROUP_MEMBER_ROW, + new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); + } + catch (final OpenDataException ode) + { + throw new ExceptionInInitializerError(ode); + } + } + + private final BDBHAMessageStore _store; + + protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException + { + super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry()); + LOGGER.debug("Creating BDBHAMessageStoreManagerMBean"); + _store = store; + register(); + } + + @Override + public String getObjectInstanceName() + { + return _store.getName(); + } + + @Override + public String getGroupName() + { + return _store.getGroupName(); + } + + @Override + public String getNodeName() + { + return _store.getNodeName(); + } + + @Override + public String getNodeHostPort() + { + return _store.getNodeHostPort(); + } + + @Override + public String getHelperHostPort() + { + return _store.getHelperHostPort(); + } + + @Override + public String getDurability() throws IOException, JMException + { + try + { + return _store.getDurability(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query replication policy", e); + throw new JMException(e.getMessage()); + } + } + + + @Override + public boolean getCoalescingSync() throws IOException, JMException + { + return _store.isCoalescingSync(); + } + + @Override + public String getNodeState() throws IOException, JMException + { + try + { + return _store.getNodeState(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query node state", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public boolean getDesignatedPrimary() throws IOException, JMException + { + try + { + return _store.isDesignatedPrimary(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public TabularData getAllNodesInGroup() throws IOException, JMException + { + final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); + final List> members = _store.getGroupMembers(); + + for (Map map : members) + { + CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); + data.put(memberData); + } + return data; + } + + @Override + public void removeNodeFromGroup(String nodeName) throws JMException + { + try + { + _store.removeNodeFromGroup(nodeName); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to remove node " + nodeName + " from group", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void setDesignatedPrimary(boolean primary) throws JMException + { + try + { + _store.setDesignatedPrimary(primary); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException + { + try + { + _store.updateAddress(nodeName, newHostName, newPort); + } + catch(AMQStoreException e) + { + LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); + throw new JMException(e.getMessage()); + } + } + + @Override + public ManagedObject getParentObject() + { + return null; + } + +} diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java new file mode 100644 index 0000000000..837da1eef3 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import javax.management.JMException; +import javax.management.StandardMBean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.jmx.MBeanProvider; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +/** + * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual + * host and of type {@link BDBHAMessageStore#BDB_HA_STORE_TYPE}. + * + */ +public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBeanProvider.class); + + public BDBHAMessageStoreManagerMBeanProvider() + { + super(); + } + + @Override + public boolean isChildManageableByMBean(ConfiguredObject child) + { + return (child instanceof VirtualHost + && BDBHAMessageStore.BDB_HA_STORE_TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + } + + @Override + public StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException + { + VirtualHost virtualHostChild = (VirtualHost) child; + + VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); + org.apache.qpid.server.virtualhost.VirtualHost vhost = virtualHostRegistry.getVirtualHost(virtualHostChild.getName()); + + BDBHAMessageStore messageStore = (BDBHAMessageStore) vhost.getMessageStore(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Creating mBean for child " + child); + } + + return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent); + } +} diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java new file mode 100644 index 0000000000..b85e44526b --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import java.io.IOException; + +import javax.management.JMException; +import javax.management.openmbean.TabularData; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; + +public interface ManagedBDBHAMessageStore +{ + public static final String TYPE = "BDBHAMessageStore"; + + public static final String ATTR_GROUP_NAME = "GroupName"; + public static final String ATTR_NODE_NAME = "NodeName"; + public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; + public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; + public static final String ATTR_DURABILITY = "Durability"; + public static final String ATTR_NODE_STATE = "NodeState"; + public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; + + @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") + String getGroupName() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") + String getNodeName() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") + String getNodeHostPort() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") + String getNodeState() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") + String getHelperHostPort() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") + String getDurability() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") + boolean getDesignatedPrimary() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") + boolean getCoalescingSync() throws IOException, JMException; + + @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + TabularData getAllNodesInGroup() throws IOException, JMException; + + @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") + void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException; + + @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") + void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException; + + @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") + void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName, + @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName, + @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException; +} + diff --git a/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider new file mode 100644 index 0000000000..b5bc947612 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider @@ -0,0 +1 @@ +org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..45038bf050 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.EnvironmentFailureException; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + _jmxUtils.setUp(); + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + final int numberOfDataRows = groupMembers.size(); + assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveNodeFromGroup() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + storeBean.removeNodeFromGroup(removedNodeName); + + final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); + } + + /** + * Updates the address of a node. + * + * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit + * assert. + * + * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case + */ + public void testUpdateAddress() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + _clusterCreator.startNode(brokerPortNumberToBeMoved); + } + + /** + * @see #testUpdateAddress() + */ + public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + // now deliberately don't call updateAddress + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + try + { + _clusterCreator.startNode(brokerPortNumberToBeMoved); + fail("Exception not thrown"); + } + catch(RuntimeException rte) + { + //check cause was BDBs EnvironmentFailureException + assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); + // PASS + } + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } +} diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..22877ec36c --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final long RECEIVE_TIMEOUT = 5000l; + + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean designedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); + + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); + + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception + { + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testClusterRestartWithoutDesignatedPrimary() throws Exception + { + startCluster(false); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception + { + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + try + { + assertProducingConsuming(connection); + fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); + } + catch(JMSException e) + { + // JMSException should be thrown on transaction start/commit + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + } + +} diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java new file mode 100644 index 0000000000..49b3ddd3dc --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; + +public class BDBHAMessageStoreManagerMBeanTest extends TestCase +{ + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "host:1234"; + private static final String TEST_HELPER_HOST_PORT = "host:5678"; + private static final String TEST_DURABILITY = "sync,sync,all"; + private static final String TEST_NODE_STATE = "MASTER"; + private static final String TEST_STORE_NAME = "testStoreName"; + private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; + + private BDBHAMessageStore _store; + private BDBHAMessageStoreManagerMBean _mBean; + private AMQManagedObject _mBeanParent; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + _store = mock(BDBHAMessageStore.class); + _mBeanParent = mock(AMQManagedObject.class); + when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class)); + _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + public void testObjectName() throws Exception + { + when(_store.getName()).thenReturn(TEST_STORE_NAME); + + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; + assertEquals(expectedObjectName, _mBean.getObjectName().toString()); + } + + public void testGroupName() throws Exception + { + when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); + + assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); + } + + public void testNodeName() throws Exception + { + when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); + + assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); + } + + public void testNodeHostPort() throws Exception + { + when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); + + assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); + } + + public void testHelperHostPort() throws Exception + { + when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + + assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); + } + + public void testDurability() throws Exception + { + when(_store.getDurability()).thenReturn(TEST_DURABILITY); + + assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); + } + + public void testCoalescingSync() throws Exception + { + when(_store.isCoalescingSync()).thenReturn(true); + + assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); + } + + public void testNodeState() throws Exception + { + when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); + + assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); + } + + public void testDesignatedPrimaryFlag() throws Exception + { + when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + + assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); + } + + public void testGroupMembersForGroupWithOneNode() throws Exception + { + List> members = Collections.singletonList(createTestNodeResult()); + when(_store.getGroupMembers()).thenReturn(members); + + final TabularData resultsTable = _mBean.getAllNodesInGroup(); + + assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); + + final int numberOfDataRows = resultsTable.size(); + assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); + final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); + assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + + public void testRemoveNodeFromReplicationGroup() throws Exception + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + + verify(_store).removeNodeFromGroup(TEST_NODE_NAME); + } + + public void testRemoveNodeFromReplicationGroupWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); + + try + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testSetAsDesignatedPrimary() throws Exception + { + _mBean.setDesignatedPrimary(true); + + verify(_store).setDesignatedPrimary(true); + } + + public void testSetAsDesignatedPrimaryWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true); + + try + { + _mBean.setDesignatedPrimary(true); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testUpdateAddress() throws Exception + { + String newHostName = "newHostName"; + int newPort = 1967; + + _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); + + verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); + } + + private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) + { + CompositeType headingsRow = resultsTable.getTabularType().getRowType(); + for (final String headingName : headingNames) + { + assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName)); + } + } + + private Map createTestNodeResult() + { + Map items = new HashMap(); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + return items; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index ab54d7d16a..c40f24dbc3 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -105,6 +105,8 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); }}); + public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + private String _groupName; private String _nodeName; private String _nodeHostPort; @@ -113,8 +115,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private String _name; - private BDBHAMessageStoreManagerMBean _managedObject; - private CommitThreadWrapper _commitThreadWrapper; private boolean _coalescingSync; private boolean _designatedPrimary; @@ -149,8 +149,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + "! Please set highAvailability.coalescingSync to false in store configuration."); } - _managedObject = new BDBHAMessageStoreManagerMBean(this); - _managedObject.register(); super.configure(name, storeConfig); } @@ -394,28 +392,18 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override protected void closeInternal() throws Exception { + substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); + try { - substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); - - try - { - if(_coalescingSync) - { - _commitThreadWrapper.stopCommitThread(); - } - } - finally + if(_coalescingSync) { - super.closeInternal(); + _commitThreadWrapper.stopCommitThread(); } } finally { - if (_managedObject != null) - { - _managedObject.unregister(); - } + super.closeInternal(); } } @@ -610,4 +598,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { } } + + @Override + public String getStoreType() + { + return BDB_HA_STORE_TYPE; + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java deleted file mode 100644 index c2c7bf4c86..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import javax.management.JMException; -import javax.management.NotCompliantMBeanException; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.management.AMQManagedObject; - -public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore -{ - private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); - - private static final TabularType GROUP_MEMBERS_TABLE; - private static final CompositeType GROUP_MEMBER_ROW; - private static final OpenType[] GROUP_MEMBER_ATTRIBUTE_TYPES; - - static - { - try - { - GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType[] {SimpleType.STRING, SimpleType.STRING}; - final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; - final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; - GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", - itemNames, - itemDescriptions, - GROUP_MEMBER_ATTRIBUTE_TYPES ); - GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", - GROUP_MEMBER_ROW, - new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); - } - catch (final OpenDataException ode) - { - throw new ExceptionInInitializerError(ode); - } - } - - private final BDBHAMessageStore _store; - - protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException - { - super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE); - _store = store; - } - - @Override - public String getObjectInstanceName() - { - return _store.getName(); - } - - @Override - public String getGroupName() - { - return _store.getGroupName(); - } - - @Override - public String getNodeName() - { - return _store.getNodeName(); - } - - @Override - public String getNodeHostPort() - { - return _store.getNodeHostPort(); - } - - @Override - public String getHelperHostPort() - { - return _store.getHelperHostPort(); - } - - @Override - public String getDurability() throws IOException, JMException - { - try - { - return _store.getDurability(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query replication policy", e); - throw new JMException(e.getMessage()); - } - } - - - @Override - public boolean getCoalescingSync() throws IOException, JMException - { - return _store.isCoalescingSync(); - } - - @Override - public String getNodeState() throws IOException, JMException - { - try - { - return _store.getNodeState(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query node state", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public boolean getDesignatedPrimary() throws IOException, JMException - { - try - { - return _store.isDesignatedPrimary(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query designated primary", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public TabularData getAllNodesInGroup() throws IOException, JMException - { - final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); - final List> members = _store.getGroupMembers(); - - for (Map map : members) - { - CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); - data.put(memberData); - } - return data; - } - - @Override - public void removeNodeFromGroup(String nodeName) throws JMException - { - try - { - _store.removeNodeFromGroup(nodeName); - } - catch (AMQStoreException e) - { - LOGGER.error("Failed to remove node " + nodeName + " from group", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public void setDesignatedPrimary(boolean primary) throws JMException - { - try - { - _store.setDesignatedPrimary(primary); - } - catch (AMQStoreException e) - { - LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException - { - try - { - _store.updateAddress(nodeName, newHostName, newPort); - } - catch(AMQStoreException e) - { - LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); - throw new JMException(e.getMessage()); - } - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index d5bf5374bc..82bc3d8564 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -42,6 +42,7 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); + private static final String BDB_STORE_TYPE = "BDB"; private CommitThreadWrapper _commitThreadWrapper; @Override @@ -103,4 +104,11 @@ public class BDBMessageStore extends AbstractBDBMessageStore return _commitThreadWrapper.commit(tx, syncCommit); } + + @Override + public String getStoreType() + { + return BDB_STORE_TYPE; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java deleted file mode 100644 index 6499ea04e0..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.IOException; - -import javax.management.JMException; -import javax.management.openmbean.TabularData; - -import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; -import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; -import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; - -public interface ManagedBDBHAMessageStore -{ - public static final String TYPE = "BDBHAMessageStore"; - - public static final String ATTR_GROUP_NAME = "GroupName"; - public static final String ATTR_NODE_NAME = "NodeName"; - public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; - public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; - public static final String ATTR_DURABILITY = "Durability"; - public static final String ATTR_NODE_STATE = "NodeState"; - public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; - public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; - - @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") - String getGroupName() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") - String getNodeName() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") - String getNodeHostPort() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") - String getNodeState() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") - String getHelperHostPort() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") - String getDurability() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") - boolean getDesignatedPrimary() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") - boolean getCoalescingSync() throws IOException, JMException; - - @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") - TabularData getAllNodesInGroup() throws IOException, JMException; - - @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") - void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException; - - @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") - void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException; - - @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") - void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName, - @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName, - @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException; -} - diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 3265fb6823..97a3d61df1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; import org.apache.qpid.server.util.MapJsonSerializer; @@ -93,6 +94,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private MapJsonSerializer _serializer = new MapJsonSerializer(); + private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString())); + /** * Upgrades from a v5 database to a v6 database * @@ -554,17 +557,49 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive, FieldTable arguments) { + Map attributesMap = buildQueueArgumentMap(queueName, + owner, exclusive, arguments); + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); + return configuredObject; + } + + private Map buildQueueArgumentMap(String queueName, + String owner, boolean exclusive, FieldTable arguments) + { + Map attributesMap = new HashMap(); attributesMap.put(Queue.NAME, queueName); - attributesMap.put(Queue.OWNER, owner); attributesMap.put(Queue.EXCLUSIVE, exclusive); + + FieldTable argumentsCopy = new FieldTable(); if (arguments != null) { - attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments)); + argumentsCopy.addAll(arguments); } - String json = _serializer.serialize(attributesMap); - UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); - return configuredObject; + + if (moveNonExclusiveOwnerToDescription(owner, exclusive)) + { + _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + + attributesMap.put(Queue.OWNER, null); + argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner); + } + else + { + attributesMap.put(Queue.OWNER, owner); + } + if (!argumentsCopy.isEmpty()) + { + attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy)); + } + return attributesMap; + } + + private boolean moveNonExclusiveOwnerToDescription(String owner, + boolean exclusive) + { + return exclusive == false && owner != null && _moveNonExclusiveQueueOwnerToDescription; } private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java deleted file mode 100644 index b64a213756..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.management.JMException; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.TabularData; - -import junit.framework.TestCase; - -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; - -public class BDBHAMessageStoreManagerMBeanTest extends TestCase -{ - private static final String TEST_GROUP_NAME = "testGroupName"; - private static final String TEST_NODE_NAME = "testNodeName"; - private static final String TEST_NODE_HOST_PORT = "host:1234"; - private static final String TEST_HELPER_HOST_PORT = "host:5678"; - private static final String TEST_DURABILITY = "sync,sync,all"; - private static final String TEST_NODE_STATE = "MASTER"; - private static final String TEST_STORE_NAME = "testStoreName"; - private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; - - private BDBHAMessageStore _store; - private BDBHAMessageStoreManagerMBean _mBean; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); - _store = mock(BDBHAMessageStore.class); - _mBean = new BDBHAMessageStoreManagerMBean(_store); - } - - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - CurrentActor.remove(); - } - - public void testObjectName() throws Exception - { - when(_store.getName()).thenReturn(TEST_STORE_NAME); - - String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; - assertEquals(expectedObjectName, _mBean.getObjectName().toString()); - } - - public void testGroupName() throws Exception - { - when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); - - assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); - } - - public void testNodeName() throws Exception - { - when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); - - assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); - } - - public void testNodeHostPort() throws Exception - { - when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); - - assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); - } - - public void testHelperHostPort() throws Exception - { - when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); - - assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); - } - - public void testDurability() throws Exception - { - when(_store.getDurability()).thenReturn(TEST_DURABILITY); - - assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); - } - - public void testCoalescingSync() throws Exception - { - when(_store.isCoalescingSync()).thenReturn(true); - - assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); - } - - public void testNodeState() throws Exception - { - when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); - - assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); - } - - public void testDesignatedPrimaryFlag() throws Exception - { - when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); - - assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); - } - - public void testGroupMembersForGroupWithOneNode() throws Exception - { - List> members = Collections.singletonList(createTestNodeResult()); - when(_store.getGroupMembers()).thenReturn(members); - - final TabularData resultsTable = _mBean.getAllNodesInGroup(); - - assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); - - final int numberOfDataRows = resultsTable.size(); - assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); - final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); - assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); - assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); - } - - public void testRemoveNodeFromReplicationGroup() throws Exception - { - _mBean.removeNodeFromGroup(TEST_NODE_NAME); - - verify(_store).removeNodeFromGroup(TEST_NODE_NAME); - } - - public void testRemoveNodeFromReplicationGroupWithError() throws Exception - { - doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); - - try - { - _mBean.removeNodeFromGroup(TEST_NODE_NAME); - fail("Exception not thrown"); - } - catch (JMException je) - { - // PASS - } - } - - public void testSetAsDesignatedPrimary() throws Exception - { - _mBean.setDesignatedPrimary(true); - - verify(_store).setDesignatedPrimary(true); - } - - public void testSetAsDesignatedPrimaryWithError() throws Exception - { - doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true); - - try - { - _mBean.setDesignatedPrimary(true); - fail("Exception not thrown"); - } - catch (JMException je) - { - // PASS - } - } - - public void testUpdateAddress() throws Exception - { - String newHostName = "newHostName"; - int newPort = 1967; - - _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); - - verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); - } - - private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) - { - CompositeType headingsRow = resultsTable.getTabularType().getRowType(); - for (final String headingName : headingNames) - { - assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName)); - } - } - - private Map createTestNodeResult() - { - Map items = new HashMap(); - items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); - items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); - return items; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index 687c671566..5cc436a22a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.DurableConfigurationStoreTest; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 591bc27d1e..e97323c5f2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -55,7 +55,7 @@ import org.apache.qpid.transport.MessageTransfer; /** * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the DBB store-implementation. + * the BDB Store as well as additional tests specific to the BDB store-implementation. */ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java deleted file mode 100644 index b01f169715..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import javax.jms.Connection; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import org.apache.log4j.Logger; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import com.sleepycat.je.EnvironmentFailureException; - -/** - * System test verifying the ability to control a cluster via the Management API. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterManagementTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); - - private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; - private static final int NUMBER_OF_NODES = 4; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - _jmxUtils.setUp(); - - _clusterCreator.configureClusterNodes(); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testReadonlyMBeanAttributes() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); - assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); - assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); - assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); - // As we have chosen an arbitrary broker from the cluster, we cannot predict its state - assertNotNull("Store state must not be null", storeBean.getNodeState()); - } - - public void testStateOfActiveBrokerIsMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); - assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); - } - - public void testStateOfNonActiveBrokerIsNotMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); - final String nodeState = storeBean.getNodeState(); - assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); - } - - public void testGroupMembers() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - final TabularData groupMembers = storeBean.getAllNodesInGroup(); - assertNotNull(groupMembers); - - final int numberOfDataRows = groupMembers.size(); - assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); - - for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) - { - final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); - final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); - - CompositeData row = groupMembers.get(new Object[] {nodeName}); - assertNotNull("Table does not contain row for node name " + nodeName, row); - assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); - } - } - - public void testRemoveNodeFromGroup() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); - final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); - - final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); - _clusterCreator.stopNode(brokerPortNumberToBeRemoved); - storeBean.removeNodeFromGroup(removedNodeName); - - final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); - } - - /** - * Updates the address of a node. - * - * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit - * assert. - * - * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case - */ - public void testUpdateAddress() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); - - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); - - _clusterCreator.startNode(brokerPortNumberToBeMoved); - } - - /** - * @see #testUpdateAddress() - */ - public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - // now deliberately don't call updateAddress - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); - - try - { - _clusterCreator.startNode(brokerPortNumberToBeMoved); - fail("Exception not thrown"); - } - catch(RuntimeException rte) - { - //check cause was BDBs EnvironmentFailureException - assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); - // PASS - } - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( - final int activeBrokerPortNumber) throws Exception - { - _jmxUtils.open(activeBrokerPortNumber); - - return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java deleted file mode 100644 index 294859832f..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import com.sleepycat.je.rep.ReplicationConfig; - -public class HAClusterTwoNodeTest extends QpidBrokerTestCase -{ - private static final long RECEIVE_TIMEOUT = 5000l; - - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; - private static final int NUMBER_OF_NODES = 2; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - _jmxUtils.setUp(); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - private void startCluster(boolean designedPrimary) throws Exception - { - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); - - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); - - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); - - _clusterCreator.configureClusterNodes(); - _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - } - - public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception - { - startCluster(true); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startNode(masterPort); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testClusterRestartWithoutDesignatedPrimary() throws Exception - { - startCluster(false); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startClusterParallel(); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to primary", connection); - assertProducingConsuming(connection); - } - - public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception - { - startCluster(false); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to primary", connection); - try - { - assertProducingConsuming(connection); - fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); - } - catch(JMSException e) - { - // JMSException should be thrown on transaction start/commit - } - } - - public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - - try - { - getConnection(_brokerFailoverUrl); - fail("Connection not expected"); - } - catch (JMSException e) - { - // PASS - } - } - - public void testInitialDesignatedPrimaryStateOfNodes() throws Exception - { - startCluster(true); - final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); - assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); - - final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); - } - - public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - - assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); - assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); - - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to new primary", connection); - assertProducingConsuming(connection); - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( - final int activeBrokerPortNumber) throws Exception - { - _jmxUtils.open(activeBrokerPortNumber); - - ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - return storeBean; - } - - private void assertProducingConsuming(final Connection connection) throws JMSException, Exception - { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - sendMessage(session, destination, 1); - connection.start(); - Message m1 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 1 is not received", m1); - assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); - session.commit(); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index 36991b90d0..cd2654f79f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -52,8 +52,8 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase } public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue", - "queue-non-durable" }; - public static int[] QUEUE_SIZES = { 1, 1, 10, 3 }; + "queue-non-durable", "nonexclusive-with-erroneous-owner" }; + public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0}; public static int TOTAL_MESSAGE_NUMBER = 15; protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index 3f9e4e4aa1..65a8bb03fb 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -23,10 +23,13 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; @@ -49,6 +52,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME; + private static final String NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName"; private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName"; private static final String EXCHANGE_DB_NAME = "exchangeDb_v5"; @@ -87,6 +91,10 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null); + assertBindingRecord(queueBindings, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, null); + + assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertContent(); } @@ -94,7 +102,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); - assertQueues(new HashSet(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE))); + assertQueues(new HashSet(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER))); assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12); @@ -112,6 +120,9 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); + + assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertContent(); } @@ -257,7 +268,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase private void assertQueues(Set expectedQueueNames) { - List durableSubNames = new ArrayList(); + List durableSubNames = Collections.emptyList(); final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); final Set actualQueueNames = new HashSet(); @@ -278,6 +289,35 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames); } + private void assertQueueHasOwner(String queueName, final String expectedOwner) + { + List durableSubNames = Collections.emptyList(); + final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); + final AtomicReference actualOwner = new AtomicReference(); + final AtomicBoolean foundQueue = new AtomicBoolean(false); + + CursorOperation queueNameCollector = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + QueueRecord record = binding.entryToObject(value); + String queueName = record.getNameShortString().asString(); + if (queueName.equals(queueName)) + { + foundQueue.set(true); + actualOwner.set(AMQShortString.toString(record.getOwner())); + } + } + }; + new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector); + + assertTrue("Could not find queue in database", foundQueue.get()); + assertEquals("Queue has unexpected owner", expectedOwner, actualOwner.get()); + } + private void assertContent() { final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 5297692820..0031447140 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -29,6 +29,7 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OL import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME; import java.io.File; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; @@ -260,7 +262,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private void assertDatabaseRecordCounts() { - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 12); assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12); assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); @@ -270,64 +272,25 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private void assertConfiguredObjects() { Map configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 9, configuredObjects.size()); - - Set> expected = new HashSet>(9); - Map queue1 = new HashMap(); - queue1.put("exclusive", Boolean.FALSE); - queue1.put("name", "myUpgradeQueue"); - queue1.put("owner", null); - expected.add(queue1); - Map queue2 = new HashMap(); - queue2.put("exclusive", Boolean.TRUE); - queue2.put("name", "clientid:mySelectorDurSubName"); - queue2.put("owner", "clientid"); - expected.add(queue2); - Map queue3 = new HashMap(); - queue3.put("exclusive", Boolean.TRUE); - queue3.put("name", "clientid:myDurSubName"); - queue3.put("owner", "clientid"); - expected.add(queue3); - - Map queueBinding1 = new HashMap(); - queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); - queueBinding1.put("name", "myUpgradeQueue"); - queueBinding1.put("exchange", UUIDGenerator.generateUUID("<>", getVirtualHostName()).toString()); - expected.add(queueBinding1); - Map queueBinding2 = new HashMap(); - queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); - queueBinding2.put("name", "myUpgradeQueue"); - queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString()); - Map arguments2 = new HashMap(); - arguments2.put("x-filter-jms-selector", ""); - queueBinding2.put("arguments", arguments2); - expected.add(queueBinding2); - Map queueBinding3 = new HashMap(); - queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); - queueBinding3.put("name", "myUpgradeTopic"); - queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); - Map arguments3 = new HashMap(); - arguments3.put("x-filter-jms-selector", ""); - queueBinding3.put("arguments", arguments3); - expected.add(queueBinding3); - Map queueBinding4 = new HashMap(); - queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); - queueBinding4.put("name", "mySelectorUpgradeTopic"); - queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); - Map arguments4 = new HashMap(); - arguments4.put("x-filter-jms-selector", "testprop='true'"); - queueBinding4.put("arguments", arguments4); - expected.add(queueBinding4); - Map queueBinding5 = new HashMap(); - queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); - queueBinding5.put("name", "clientid:myDurSubName"); - queueBinding5.put("exchange", UUIDGenerator.generateUUID("<>", getVirtualHostName()).toString()); - expected.add(queueBinding5); - Map queueBinding6 = new HashMap(); - queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); - queueBinding6.put("name", "clientid:mySelectorDurSubName"); - queueBinding6.put("exchange", UUIDGenerator.generateUUID("<>", getVirtualHostName()).toString()); - expected.add(queueBinding6); + assertEquals("Unexpected number of configured objects", 12, configuredObjects.size()); + + Set> expected = new HashSet>(12); + expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null)); + expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null)); + expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null)); + expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null, + Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description"))); + + expected.add(createExpectedQueueBindingMap("myUpgradeQueue","myUpgradeQueue", "<>", null)); + expected.add(createExpectedQueueBindingMap("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null)); + expected.add(createExpectedQueueBindingMap("clientid:myDurSubName", "myUpgradeTopic", "amq.topic", + Collections.singletonMap("x-filter-jms-selector", ""))); + expected.add(createExpectedQueueBindingMap("clientid:mySelectorDurSubName", "mySelectorUpgradeTopic", "amq.topic", + Collections.singletonMap("x-filter-jms-selector", "testprop='true'"))); + expected.add(createExpectedQueueBindingMap("clientid:myDurSubName", "clientid:myDurSubName", "<>", null)); + expected.add(createExpectedQueueBindingMap("clientid:mySelectorDurSubName", "clientid:mySelectorDurSubName", "<>", null)); + expected.add(createExpectedQueueBindingMap("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null)); + expected.add(createExpectedQueueBindingMap("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<>", null)); Set expectedTypes = new HashSet(); expectedTypes.add(Queue.class.getName()); @@ -337,11 +300,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase for (Entry entry : configuredObjects.entrySet()) { UpgradeConfiguredObjectRecord object = entry.getValue(); - UUID key = entry.getKey(); Map deserialized = jsonSerializer.deserialize(object.getAttributes()); assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized)); String type = object.getType(); assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); + UUID key = entry.getKey(); if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName())) { assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName())); @@ -354,6 +317,32 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty()); } + private Map createExpectedQueueBindingMap(String queue, String bindingName, String exchangeName, Map argumentMap) + { + Map expectedQueueBinding = new HashMap(); + expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateUUID(queue, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.NAME, bindingName); + expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, getVirtualHostName()).toString()); + if (argumentMap != null) + { + expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap); + } + return expectedQueueBinding; + } + + private Map createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map argumentMap) + { + Map expectedQueueEntry = new HashMap(); + expectedQueueEntry.put(Queue.NAME, name); + expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag); + expectedQueueEntry.put(Queue.OWNER, owner); + if (argumentMap != null) + { + expectedQueueEntry.put(Queue.ARGUMENTS, argumentMap); + } + return expectedQueueEntry; + } + private Map loadConfiguredObjects() { final Map configuredObjectsRecords = new HashMap(); diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb index 167ab7f0ca..f5ed9aa5a2 100644 Binary files a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb and b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb differ diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb index d44b21a83e..f5ed9aa5a2 100644 Binary files a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb and b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb differ diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb index 9b85860c19..d5ae8c1096 100644 Binary files a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb and b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb differ -- cgit v1.2.1