diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-06-28 16:46:12 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-06-28 16:46:12 +0000 |
| commit | 07c285f662e8f60d4e8aca247b65b77ca5df4587 (patch) | |
| tree | 7fe15262589c0fe5206e02a5e9336c6288f004e0 /qpid/java/bdbstore/src/main | |
| parent | bb45ec03f95ffdfa6c0163067dcb75af8b64ceb5 (diff) | |
| download | qpid-python-07c285f662e8f60d4e8aca247b65b77ca5df4587.tar.gz | |
QPID-3998, QPID-3999, QPID-4093: add new management plugins for jmx/rest/webui functionality, partial merge from the java-config-and-management branch at r1355039
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355072 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
5 files changed, 61 insertions, 320 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index ab54d7d16a..c40f24dbc3 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -105,6 +105,8 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); }}); + public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + private String _groupName; private String _nodeName; private String _nodeHostPort; @@ -113,8 +115,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private String _name; - private BDBHAMessageStoreManagerMBean _managedObject; - private CommitThreadWrapper _commitThreadWrapper; private boolean _coalescingSync; private boolean _designatedPrimary; @@ -149,8 +149,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + "! Please set highAvailability.coalescingSync to false in store configuration."); } - _managedObject = new BDBHAMessageStoreManagerMBean(this); - _managedObject.register(); super.configure(name, storeConfig); } @@ -394,28 +392,18 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override protected void closeInternal() throws Exception { + substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); + try { - substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); - - try - { - if(_coalescingSync) - { - _commitThreadWrapper.stopCommitThread(); - } - } - finally + if(_coalescingSync) { - super.closeInternal(); + _commitThreadWrapper.stopCommitThread(); } } finally { - if (_managedObject != null) - { - _managedObject.unregister(); - } + super.closeInternal(); } } @@ -610,4 +598,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { } } + + @Override + public String getStoreType() + { + return BDB_HA_STORE_TYPE; + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java deleted file mode 100644 index c2c7bf4c86..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import javax.management.JMException; -import javax.management.NotCompliantMBeanException; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.management.AMQManagedObject; - -public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore -{ - private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); - - private static final TabularType GROUP_MEMBERS_TABLE; - private static final CompositeType GROUP_MEMBER_ROW; - private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES; - - static - { - try - { - GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING}; - final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; - final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; - GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", - itemNames, - itemDescriptions, - GROUP_MEMBER_ATTRIBUTE_TYPES ); - GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", - GROUP_MEMBER_ROW, - new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); - } - catch (final OpenDataException ode) - { - throw new ExceptionInInitializerError(ode); - } - } - - private final BDBHAMessageStore _store; - - protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException - { - super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE); - _store = store; - } - - @Override - public String getObjectInstanceName() - { - return _store.getName(); - } - - @Override - public String getGroupName() - { - return _store.getGroupName(); - } - - @Override - public String getNodeName() - { - return _store.getNodeName(); - } - - @Override - public String getNodeHostPort() - { - return _store.getNodeHostPort(); - } - - @Override - public String getHelperHostPort() - { - return _store.getHelperHostPort(); - } - - @Override - public String getDurability() throws IOException, JMException - { - try - { - return _store.getDurability(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query replication policy", e); - throw new JMException(e.getMessage()); - } - } - - - @Override - public boolean getCoalescingSync() throws IOException, JMException - { - return _store.isCoalescingSync(); - } - - @Override - public String getNodeState() throws IOException, JMException - { - try - { - return _store.getNodeState(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query node state", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public boolean getDesignatedPrimary() throws IOException, JMException - { - try - { - return _store.isDesignatedPrimary(); - } - catch (RuntimeException e) - { - LOGGER.debug("Failed query designated primary", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public TabularData getAllNodesInGroup() throws IOException, JMException - { - final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); - final List<Map<String, String>> members = _store.getGroupMembers(); - - for (Map<String, String> map : members) - { - CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); - data.put(memberData); - } - return data; - } - - @Override - public void removeNodeFromGroup(String nodeName) throws JMException - { - try - { - _store.removeNodeFromGroup(nodeName); - } - catch (AMQStoreException e) - { - LOGGER.error("Failed to remove node " + nodeName + " from group", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public void setDesignatedPrimary(boolean primary) throws JMException - { - try - { - _store.setDesignatedPrimary(primary); - } - catch (AMQStoreException e) - { - LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); - throw new JMException(e.getMessage()); - } - } - - @Override - public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException - { - try - { - _store.updateAddress(nodeName, newHostName, newPort); - } - catch(AMQStoreException e) - { - LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); - throw new JMException(e.getMessage()); - } - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index d5bf5374bc..82bc3d8564 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -42,6 +42,7 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); + private static final String BDB_STORE_TYPE = "BDB"; private CommitThreadWrapper _commitThreadWrapper; @Override @@ -103,4 +104,11 @@ public class BDBMessageStore extends AbstractBDBMessageStore return _commitThreadWrapper.commit(tx, syncCommit); } + + @Override + public String getStoreType() + { + return BDB_STORE_TYPE; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java deleted file mode 100644 index 6499ea04e0..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.IOException; - -import javax.management.JMException; -import javax.management.openmbean.TabularData; - -import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; -import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; -import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; - -public interface ManagedBDBHAMessageStore -{ - public static final String TYPE = "BDBHAMessageStore"; - - public static final String ATTR_GROUP_NAME = "GroupName"; - public static final String ATTR_NODE_NAME = "NodeName"; - public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; - public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; - public static final String ATTR_DURABILITY = "Durability"; - public static final String ATTR_NODE_STATE = "NodeState"; - public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; - public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; - - @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") - String getGroupName() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") - String getNodeName() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") - String getNodeHostPort() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") - String getNodeState() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") - String getHelperHostPort() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") - String getDurability() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") - boolean getDesignatedPrimary() throws IOException, JMException; - - @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") - boolean getCoalescingSync() throws IOException, JMException; - - @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") - TabularData getAllNodesInGroup() throws IOException, JMException; - - @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") - void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException; - - @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") - void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException; - - @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") - void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName, - @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName, - @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException; -} - diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 3265fb6823..97a3d61df1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; import org.apache.qpid.server.util.MapJsonSerializer; @@ -93,6 +94,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private MapJsonSerializer _serializer = new MapJsonSerializer(); + private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString())); + /** * Upgrades from a v5 database to a v6 database * @@ -554,17 +557,49 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive, FieldTable arguments) { + Map<String, Object> attributesMap = buildQueueArgumentMap(queueName, + owner, exclusive, arguments); + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); + return configuredObject; + } + + private Map<String, Object> buildQueueArgumentMap(String queueName, + String owner, boolean exclusive, FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); attributesMap.put(Queue.NAME, queueName); - attributesMap.put(Queue.OWNER, owner); attributesMap.put(Queue.EXCLUSIVE, exclusive); + + FieldTable argumentsCopy = new FieldTable(); if (arguments != null) { - attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments)); + argumentsCopy.addAll(arguments); } - String json = _serializer.serialize(attributesMap); - UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); - return configuredObject; + + if (moveNonExclusiveOwnerToDescription(owner, exclusive)) + { + _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + + attributesMap.put(Queue.OWNER, null); + argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner); + } + else + { + attributesMap.put(Queue.OWNER, owner); + } + if (!argumentsCopy.isEmpty()) + { + attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy)); + } + return attributesMap; + } + + private boolean moveNonExclusiveOwnerToDescription(String owner, + boolean exclusive) + { + return exclusive == false && owner != null && _moveNonExclusiveQueueOwnerToDescription; } private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, |
