From 566f96394dd04b81c3e2987a221dd935a7ab2276 Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Fri, 9 May 2014 13:24:24 +0000 Subject: QPID-5409 : Change BDB HA MBean to delegate operations to BDB HA node instance git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1593538 13f79535-47bb-0310-9956-ffa450edef68 --- .../jmx/BDBHAMessageStoreManagerMBean.java | 117 ++++++++++--------- .../jmx/BDBHAMessageStoreManagerMBeanProvider.java | 12 +- .../berkeleydb/jmx/ManagedBDBHAMessageStore.java | 3 + .../jmx/BDBHAMessageStoreManagerMBeanTest.java | 125 ++++++++++----------- .../replication/ReplicatedEnvironmentFacade.java | 19 ---- .../berkeleydb/BDBHARemoteReplicationNodeImpl.java | 60 +++++----- .../berkeleydb/BDBHAVirtualHostNodeImpl.java | 2 +- .../store/berkeleydb/BDBHAVirtualHostNodeTest.java | 24 +++- .../berkeleydb/BDBHAVirtualHostNodeRestTest.java | 12 +- .../store/berkeleydb/HAClusterManagementTest.java | 86 ++++---------- .../qpid/server/jmx/JMXManagementPluginImpl.java | 63 ++++++----- 11 files changed, 241 insertions(+), 282 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 3a21bc70d7..7146af364e 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -20,7 +20,9 @@ package org.apache.qpid.server.store.berkeleydb.jmx; import java.io.IOException; -import java.util.List; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import javax.management.JMException; @@ -39,7 +41,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; /** * Management mbean for BDB HA. @@ -57,7 +62,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M try { GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType[] {SimpleType.STRING, SimpleType.STRING}; - final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemNames = new String[] {GRP_MEM_COL_NODE_NAME, GRP_MEM_COL_NODE_HOST_PORT}; final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", itemNames, @@ -65,7 +70,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M GROUP_MEMBER_ATTRIBUTE_TYPES ); GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", GROUP_MEMBER_ROW, - new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME}); + new String[] {GRP_MEM_COL_NODE_NAME}); } catch (final OpenDataException ode) { @@ -73,15 +78,15 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } - private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; + private final BDBHAVirtualHostNode _virtualHostNode; private final String _objectName; - protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObjectRegistry registry) throws JMException + protected BDBHAMessageStoreManagerMBean(BDBHAVirtualHostNode virtualHostNode, ManagedObjectRegistry registry) throws JMException { super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, registry); - LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName); - _replicatedEnvironmentFacade = replicatedEnvironmentFacade; - _objectName = ObjectName.quote(virtualHostName); + LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostNode.getName()); + _virtualHostNode = virtualHostNode; + _objectName = ObjectName.quote( virtualHostNode.getGroupName()); register(); } @@ -94,46 +99,38 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public String getGroupName() { - return _replicatedEnvironmentFacade.getGroupName(); + return _virtualHostNode.getGroupName(); } @Override public String getNodeName() { - return _replicatedEnvironmentFacade.getNodeName(); + return _virtualHostNode.getName(); } @Override public String getNodeHostPort() { - return _replicatedEnvironmentFacade.getHostPort(); + return _virtualHostNode.getAddress(); } @Override public String getHelperHostPort() { - return _replicatedEnvironmentFacade.getHelperHostPort(); + return _virtualHostNode.getHelperAddress(); } @Override public String getDurability() throws IOException, JMException { - try - { - return _replicatedEnvironmentFacade.getDurability(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query replication policy", e); - throw new JMException(e.getMessage()); - } + return _virtualHostNode.getDurability(); } @Override public boolean getCoalescingSync() throws IOException, JMException { - return _replicatedEnvironmentFacade.isCoalescingSync(); + return _virtualHostNode.isCoalescingSync(); } @Override @@ -141,11 +138,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - return _replicatedEnvironmentFacade.getNodeState(); + return _virtualHostNode.getRole(); } catch (RuntimeException e) { - LOGGER.debug("Failed query node state", e); + LOGGER.debug("Failed query node role", e); throw new JMException(e.getMessage()); } } @@ -153,26 +150,30 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public boolean getDesignatedPrimary() throws IOException, JMException { - try - { - return _replicatedEnvironmentFacade.isDesignatedPrimary(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query designated primary", e); - throw new JMException(e.getMessage()); - } + return _virtualHostNode.isDesignatedPrimary(); } @Override public TabularData getAllNodesInGroup() throws IOException, JMException { final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); - final List> members = _replicatedEnvironmentFacade.getGroupMembers(); - for (Map map : members) + Map localNodeMap = new HashMap(); + localNodeMap.put(GRP_MEM_COL_NODE_NAME, _virtualHostNode.getName()); + localNodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, _virtualHostNode.getAddress()); + CompositeData localNodeData = new CompositeDataSupport(GROUP_MEMBER_ROW, localNodeMap); + data.put(localNodeData); + + @SuppressWarnings("rawtypes") + final Collection members = _virtualHostNode.getRemoteReplicationNodes(); + for (RemoteReplicationNode remoteNode : members) { - CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); + BDBHARemoteReplicationNode haReplicationNode = (BDBHARemoteReplicationNode)remoteNode; + Map nodeMap = new HashMap(); + nodeMap.put(GRP_MEM_COL_NODE_NAME, haReplicationNode.getName()); + nodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, haReplicationNode.getAddress()); + + CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, nodeMap); data.put(memberData); } return data; @@ -181,14 +182,32 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public void removeNodeFromGroup(String nodeName) throws JMException { - try + if (getNodeName().equals(nodeName)) { - _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName); + _virtualHostNode.delete(); } - catch (RuntimeException e) + else { - LOGGER.error("Failed to remove node " + nodeName + " from group", e); - throw new JMException(e.getMessage()); + @SuppressWarnings("rawtypes") + Collection remoteNodes = _virtualHostNode.getRemoteReplicationNodes(); + for (RemoteReplicationNode remoteNode : remoteNodes) + { + if (remoteNode.getName().equals(nodeName)) + { + try + { + remoteNode.delete(); + return; + } + catch(IllegalStateTransitionException e) + { + LOGGER.error("Cannot remove node '" + nodeName + "' from the group", e); + throw new JMException("Cannot remove node '" + nodeName + "' from the group:" + e.getMessage()); + } + } + } + + throw new JMException("Failed to find replication node with name '" + nodeName + "'."); } } @@ -197,11 +216,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - _replicatedEnvironmentFacade.setDesignatedPrimary(primary); + _virtualHostNode.setAttributes(Collections.singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, primary)); } catch (RuntimeException e) { - LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e); + LOGGER.error("Failed to set node " + _virtualHostNode.getName() + " as designated primary", e); throw new JMException(e.getMessage()); } } @@ -209,15 +228,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException { - try - { - _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort); - } - catch(RuntimeException e) - { - LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); - throw new JMException(e.getMessage()); - } + throw new UnsupportedOperationException("Unsupported operation. Delete the node then add a new node in its place."); } @Override diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 558cc7e8a9..fb61df68b7 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -21,17 +21,12 @@ package org.apache.qpid.server.store.berkeleydb.jmx; import javax.management.JMException; -import javax.management.StandardMBean; import org.apache.log4j.Logger; import org.apache.qpid.server.jmx.MBeanProvider; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; @@ -58,17 +53,12 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider @Override public ManagedObject createMBean(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException { - BDBHAVirtualHostNode virtualHostNode = (BDBHAVirtualHostNode) child; - - BDBMessageStore messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("Creating mBean for child " + child); } - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade(); - return new BDBHAMessageStoreManagerMBean(virtualHostNode.getGroupName(), replicatedEnvironmentFacade, registry); + return new BDBHAMessageStoreManagerMBean((BDBHAVirtualHostNode) child, registry); } @Override diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java index b85e44526b..fc1cd0801a 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java @@ -41,6 +41,9 @@ public interface ManagedBDBHAMessageStore public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") String getGroupName() throws IOException, JMException; diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 6aeadde0f8..439af259ab 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -19,15 +19,16 @@ */ package org.apache.qpid.server.store.berkeleydb.jmx; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Iterator; import javax.management.JMException; import javax.management.ObjectName; @@ -37,22 +38,24 @@ import javax.management.openmbean.TabularData; import junit.framework.TestCase; -import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; public class BDBHAMessageStoreManagerMBeanTest extends TestCase { - private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_VHOST_NAME = "test"; + private static final String TEST_GROUP_NAME = TEST_VHOST_NAME; private static final String TEST_NODE_NAME = "testNodeName"; private static final String TEST_NODE_HOST_PORT = "host:1234"; private static final String TEST_HELPER_HOST_PORT = "host:5678"; private static final String TEST_DURABILITY = "sync,sync,all"; private static final String TEST_NODE_STATE = "MASTER"; - private static final String TEST_STORE_NAME = "testStoreName"; private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; - private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; + private BDBHAVirtualHostNode _virtualHostNode; private BDBHAMessageStoreManagerMBean _mBean; @Override @@ -60,9 +63,13 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase { super.setUp(); - _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class); + _virtualHostNode = mock(BDBHAVirtualHostNode.class); + when(_virtualHostNode.getName()).thenReturn(TEST_NODE_NAME); + when(_virtualHostNode.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(_virtualHostNode.getAddress()).thenReturn(TEST_NODE_HOST_PORT); + ManagedObjectRegistry registry = mock(ManagedObjectRegistry.class); - _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade, registry); + _mBean = new BDBHAMessageStoreManagerMBean(_virtualHostNode, registry); } @Override @@ -73,134 +80,119 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase public void testObjectName() throws Exception { - String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME); + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_VHOST_NAME); assertEquals(expectedObjectName, _mBean.getObjectName().toString()); } public void testGroupName() throws Exception { - when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(_virtualHostNode.getGroupName()).thenReturn(TEST_GROUP_NAME); assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); } public void testNodeName() throws Exception { - when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME); + when(_virtualHostNode.getName()).thenReturn(TEST_NODE_NAME); assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); } public void testNodeHostPort() throws Exception { - when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT); + when(_virtualHostNode.getAddress()).thenReturn(TEST_NODE_HOST_PORT); assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); } public void testHelperHostPort() throws Exception { - when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + when(_virtualHostNode.getHelperAddress()).thenReturn(TEST_HELPER_HOST_PORT); assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); } public void testDurability() throws Exception { - when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY); + when(_virtualHostNode.getDurability()).thenReturn(TEST_DURABILITY); assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); } public void testCoalescingSync() throws Exception { - when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true); + when(_virtualHostNode.isCoalescingSync()).thenReturn(true); assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); } public void testNodeState() throws Exception { - when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE); + when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE); assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); } public void testDesignatedPrimaryFlag() throws Exception { - when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + when(_virtualHostNode.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); } public void testGroupMembersForGroupWithOneNode() throws Exception { - List> members = Collections.singletonList(createTestNodeResult()); - when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members); + BDBHARemoteReplicationNode node = mockRemoteNode(); final TabularData resultsTable = _mBean.getAllNodesInGroup(); - assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT); + assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME, + BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT); final int numberOfDataRows = resultsTable.size(); - assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); - final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); - assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME)); - assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT)); - } + assertEquals("Unexpected number of data rows", 2, numberOfDataRows); + Iterator iterator = resultsTable.values().iterator(); - public void testRemoveNodeFromReplicationGroup() throws Exception - { - _mBean.removeNodeFromGroup(TEST_NODE_NAME); + final CompositeData firstRow = (CompositeData) iterator.next(); + assertEquals(TEST_NODE_NAME, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT)); - verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME); + final CompositeData secondRow = (CompositeData) iterator.next(); + assertEquals(node.getName(), secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME)); + assertEquals(node.getAddress(), secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT)); } - public void testRemoveNodeFromReplicationGroupWithError() throws Exception + public void testRemoveNodeFromReplicationGroup() throws Exception { - doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME); + BDBHARemoteReplicationNode node = mockRemoteNode(); - try - { - _mBean.removeNodeFromGroup(TEST_NODE_NAME); - fail("Exception not thrown"); - } - catch (JMException je) - { - // PASS - } - } - - public void testSetAsDesignatedPrimary() throws Exception - { - _mBean.setDesignatedPrimary(true); + _mBean.removeNodeFromGroup(node.getName()); - verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true); + verify(node).delete(); } - public void testSetAsDesignatedPrimaryWithError() throws Exception + public void testRemoveNodeFromReplicationGroupOnIllegalStateTransitionException() throws Exception { - doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true); + BDBHARemoteReplicationNode node = mockRemoteNode(); + doThrow(new IllegalStateTransitionException("test")).when(node).delete(); - try + try { - _mBean.setDesignatedPrimary(true); + _mBean.removeNodeFromGroup("remotenode"); fail("Exception not thrown"); } catch (JMException je) { - // PASS + // PASS# } } - public void testUpdateAddress() throws Exception + public void testSetAsDesignatedPrimary() throws Exception { - String newHostName = "newHostName"; - int newPort = 1967; - - _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); + _mBean.setDesignatedPrimary(true); - verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort); + verify(_virtualHostNode).setAttributes( + eq(Collections. singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true))); } private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) @@ -212,11 +204,16 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase } } - private Map createTestNodeResult() + private BDBHARemoteReplicationNode mockRemoteNode() { - Map items = new HashMap(); - items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); - items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); - return items; + BDBHARemoteReplicationNode remoteNode = mock(BDBHARemoteReplicationNode.class); + when(remoteNode.getName()).thenReturn("remotenode"); + when(remoteNode.getAddress()).thenReturn("remotehost:port"); + + @SuppressWarnings("rawtypes") + Collection remoteNodes = Collections.singletonList(remoteNode); + doReturn(remoteNodes).when(_virtualHostNode).getRemoteReplicationNodes(); + + return remoteNode; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index ab3d5e0cfa..cde00a8804 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -135,10 +135,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String TYPE = "BDB-HA"; - // TODO: JMX will change to observe the model, at that point these names will disappear - public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; - public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; - private final ReplicatedEnvironmentConfiguration _configuration; private final Durability _durability; private final Boolean _coalescingSync; @@ -718,21 +714,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public List> getGroupMembers() - { - List> members = new ArrayList>(); - - for (ReplicationNode node : _environment.getGroup().getNodes()) - { - Map nodeMap = new HashMap(); - nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName()); - nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); - members.add(nodeMap); - } - - return members; - } - private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set helpers = new HashSet(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index d143d5a748..d1e6b39bcc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -21,14 +21,15 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; +import java.io.File; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.je.rep.MasterStateException; import com.sleepycat.je.rep.ReplicatedEnvironment; -import org.apache.log4j.Logger; +import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; @@ -53,7 +54,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject _state; - public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl virtualHostNode, Map attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) + public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode virtualHostNode, Map attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) { super(parentsMap(virtualHostNode), attributes); _address = (String)attributes.get(ADDRESS); @@ -97,10 +98,30 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject proxyForValidation, final Set changedAttributes) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index ce7c79208f..d2ed22c14c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -37,8 +37,8 @@ import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; -import org.apache.log4j.Logger; +import org.apache.log4j.Logger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 4bff8918fd..6fd7b0bc1d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -403,16 +403,32 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS)); - Collection remoteNodes = node1.getRemoteReplicationNodes(); - BDBHARemoteReplicationNodeImpl replicaRemoteNode = (BDBHARemoteReplicationNodeImpl)remoteNodes.iterator().next(); + BDBHARemoteReplicationNodeImpl replicaRemoteNode = null; long awaitReplicaRoleCount = 0; - while(!"REPLICA".equals(replicaRemoteNode.getRole())) + while(replicaRemoteNode == null) { + Collection remoteNodes = node1.getRemoteReplicationNodes(); + if (remoteNodes != null) + { + for (RemoteReplicationNode node : remoteNodes) + { + BDBHARemoteReplicationNodeImpl bdbNode = (BDBHARemoteReplicationNodeImpl)node; + if ("REPLICA".equals(bdbNode.getRole())) + { + replicaRemoteNode = bdbNode; + break; + } + } + if (replicaRemoteNode != null) + { + break; + } + } Thread.sleep(100); if (awaitReplicaRoleCount > 50) { - fail("Remote replication node is not in a REPLICA role"); + fail("Remote replication node is not in a REPLICA role: " + remoteNodes); } awaitReplicaRoleCount++; } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java index b15a4ab37d..db06aa579e 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -123,26 +123,24 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception { boolean isMaster = nodeName.equals(masterNode); - waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, isMaster? "MASTER" : "REPLICA"); + String expectedRole = isMaster? "MASTER" : "REPLICA"; + waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole); - Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName); + Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0"); assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME)); assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE)); assertEquals("Unexpected path", new File(_storeBaseDir, nodeName).getPath(), nodeData.get(BDBHAVirtualHostNode.STORE_PATH)); assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS)); assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));; + assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE)); if (isMaster) { - assertEquals("Unexpected role", "MASTER", nodeData.get(BDBHAVirtualHostNode.ROLE)); Map hostData = getRestTestHelper().getJsonAsSingletonList("virtualhost/" + masterNode + "/" + _hostName + "?depth=0"); assertEquals("Unexpected host name", _hostName, hostData.get(VirtualHost.NAME)); } - else - { - assertEquals("Unexpected role", "REPLICA", nodeData.get(BDBHAVirtualHostNode.ROLE)); - } + } private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index e8d18971ad..200f2c1087 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -38,7 +38,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -144,11 +143,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase CompositeData row = groupMembers.get(new Object[] {nodeName}); assertNotNull("Table does not contain row for node name " + nodeName, row); - assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT)); + assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); } } - public void testRemoveNodeFromGroup() throws Exception + public void testRemoveRemoteNodeFromGroup() throws Exception { final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); @@ -158,70 +157,17 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); _clusterCreator.stopNode(brokerPortNumberToBeRemoved); - storeBean.removeNodeFromGroup(removedNodeName); - - final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); - } - - /** - * Updates the address of a node. - * - * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit - * assert. - * - * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case - */ - public void testUpdateAddress() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); - - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); - - _clusterCreator.startNode(brokerPortNumberToBeMoved); - } - - /** - * @see #testUpdateAddress() - */ - public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - // now deliberately don't call updateAddress - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + storeBean.removeNodeFromGroup(removedNodeName); - try - { - _clusterCreator.startNode(brokerPortNumberToBeMoved); - fail("Exception not thrown"); - } - catch(RuntimeException rte) + long limitTime = System.currentTimeMillis() + 5000; + while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime) { - //check cause was BDBs EnvironmentFailureException - assertTrue("Message '"+rte.getMessage()+"' does not contain '" - + EnvironmentFailureException.class.getName() - + "'.", - rte.getMessage().contains(EnvironmentFailureException.class.getName())); - // PASS + Thread.sleep(100l); } + + int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval); } public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception @@ -254,6 +200,20 @@ public class HAClusterManagementTest extends QpidBrokerTestCase } } + public void testSetDesignatedPrimary() throws Exception + { + int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); + assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + long limit = System.currentTimeMillis() + 5000; + while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100l); + } + assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); + } + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception { _jmxUtils.open(brokerPortNumber); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index d3b5b786e9..9d1cd8df59 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -23,15 +23,16 @@ package org.apache.qpid.server.jmx; import java.io.IOException; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import javax.management.JMException; import org.apache.log4j.Logger; - import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.jmx.mbeans.LoggingManagementMBean; import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean; @@ -81,7 +82,7 @@ public class JMXManagementPluginImpl private JMXManagedObjectRegistry _objectRegistry; private final Object _childrenLock = new Object(); - private final Map _children = new HashMap(); + private final Map> _children = new HashMap>(); @ManagedAttributeField private boolean _usePlatformMBeanServer; @@ -157,7 +158,7 @@ public class JMXManagementPluginImpl if (host != null) { VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry); - _children.put(host, mbean); + addMBean(host, mbean); } createAdditionalMBeansFromProviders(virtualHostNode, _objectRegistry); } @@ -170,7 +171,7 @@ public class JMXManagementPluginImpl UserManagementMBean mbean = new UserManagementMBean( (PasswordCredentialManagingAuthenticationProvider) authenticationProvider, _objectRegistry); - _children.put(authenticationProvider, mbean); + addMBean(authenticationProvider, mbean); } createAdditionalMBeansFromProviders(authenticationProvider, _objectRegistry); } @@ -186,6 +187,17 @@ public class JMXManagementPluginImpl _allowPortActivation = false; } + private void addMBean(ConfiguredObject configuredObject, ManagedObject mbean) + { + List mbeanList = _children.get(configuredObject); + if (mbeanList == null) + { + mbeanList = new ArrayList(); + _children.put(configuredObject, mbeanList); + } + mbeanList.add(mbean); + } + @Override public boolean isActivationAllowed(final Port port) { @@ -223,7 +235,21 @@ public class JMXManagementPluginImpl { for(ConfiguredObject object : _children.keySet()) { - AMQManagedObject mbean = _children.get(object); + unregisterChildMBeans(object); + } + _children.clear(); + } + getBroker().removeChangeListener(this); + closeObjectRegistry(); + } + + private void unregisterChildMBeans(ConfiguredObject object) + { + List mbeans = _children.get(object); + if (mbeans != null) + { + for (ManagedObject mbean : mbeans) + { if (mbean instanceof ConfigurationChangeListener) { object.removeChangeListener((ConfigurationChangeListener)mbean); @@ -237,10 +263,7 @@ public class JMXManagementPluginImpl LOGGER.error("Exception while unregistering mbean for " + object.getClass().getSimpleName() + " " + object.getName(), e); } } - _children.clear(); } - getBroker().removeChangeListener(this); - closeObjectRegistry(); } @Override @@ -278,7 +301,7 @@ public class JMXManagementPluginImpl if (mbean != null) { - _children.put(child, mbean); + addMBean(child, mbean); } createAdditionalMBeansFromProviders(child, _objectRegistry); } @@ -296,20 +319,8 @@ public class JMXManagementPluginImpl synchronized (_childrenLock) { child.removeChangeListener(this); - - AMQManagedObject mbean = _children.remove(child); - if(mbean != null) - { - try - { - mbean.unregister(); - } - catch(Exception e) - { - LOGGER.error("Exception while unregistering mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e); - //TODO - report error on removing child MBean - } - } + unregisterChildMBeans(child); + _children.remove(child); } } @@ -337,8 +348,10 @@ public class JMXManagementPluginImpl LOGGER.debug("Provider will create mbean"); } mBean = provider.createMBean(child, registry); - // TODO track the mbeans that have been created on behalf of a child in a map, then - // if the child is ever removed, destroy these beans too. + if (mBean != null) + { + addMBean(child, mBean); + } } if(LOGGER.isDebugEnabled()) -- cgit v1.2.1