summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-06-28 16:46:12 +0000
committerRobert Gemmell <robbie@apache.org>2012-06-28 16:46:12 +0000
commit07c285f662e8f60d4e8aca247b65b77ca5df4587 (patch)
tree7fe15262589c0fe5206e02a5e9336c6288f004e0 /qpid/java/bdbstore/src/main
parentbb45ec03f95ffdfa6c0163067dcb75af8b64ceb5 (diff)
downloadqpid-python-07c285f662e8f60d4e8aca247b65b77ca5df4587.tar.gz
QPID-3998, QPID-3999, QPID-4093: add new management plugins for jmx/rest/webui functionality, partial merge from the java-config-and-management branch at r1355039
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355072 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java32
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java214
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java82
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java45
5 files changed, 61 insertions, 320 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index ab54d7d16a..c40f24dbc3 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -105,6 +105,8 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
}});
+ public static final String BDB_HA_STORE_TYPE = "BDB-HA";
+
private String _groupName;
private String _nodeName;
private String _nodeHostPort;
@@ -113,8 +115,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private String _name;
- private BDBHAMessageStoreManagerMBean _managedObject;
-
private CommitThreadWrapper _commitThreadWrapper;
private boolean _coalescingSync;
private boolean _designatedPrimary;
@@ -149,8 +149,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- _managedObject = new BDBHAMessageStoreManagerMBean(this);
- _managedObject.register();
super.configure(name, storeConfig);
}
@@ -394,28 +392,18 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
@Override
protected void closeInternal() throws Exception
{
+ substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
+
try
{
- substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
-
- try
- {
- if(_coalescingSync)
- {
- _commitThreadWrapper.stopCommitThread();
- }
- }
- finally
+ if(_coalescingSync)
{
- super.closeInternal();
+ _commitThreadWrapper.stopCommitThread();
}
}
finally
{
- if (_managedObject != null)
- {
- _managedObject.unregister();
- }
+ super.closeInternal();
}
}
@@ -610,4 +598,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
}
}
+
+ @Override
+ public String getStoreType()
+ {
+ return BDB_HA_STORE_TYPE;
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
deleted file mode 100644
index c2c7bf4c86..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import javax.management.JMException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.management.AMQManagedObject;
-
-public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
-{
- private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class);
-
- private static final TabularType GROUP_MEMBERS_TABLE;
- private static final CompositeType GROUP_MEMBER_ROW;
- private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES;
-
- static
- {
- try
- {
- GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
- final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
- GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
- itemNames,
- itemDescriptions,
- GROUP_MEMBER_ATTRIBUTE_TYPES );
- GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
- GROUP_MEMBER_ROW,
- new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
- }
- catch (final OpenDataException ode)
- {
- throw new ExceptionInInitializerError(ode);
- }
- }
-
- private final BDBHAMessageStore _store;
-
- protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException
- {
- super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE);
- _store = store;
- }
-
- @Override
- public String getObjectInstanceName()
- {
- return _store.getName();
- }
-
- @Override
- public String getGroupName()
- {
- return _store.getGroupName();
- }
-
- @Override
- public String getNodeName()
- {
- return _store.getNodeName();
- }
-
- @Override
- public String getNodeHostPort()
- {
- return _store.getNodeHostPort();
- }
-
- @Override
- public String getHelperHostPort()
- {
- return _store.getHelperHostPort();
- }
-
- @Override
- public String getDurability() throws IOException, JMException
- {
- try
- {
- return _store.getDurability();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query replication policy", e);
- throw new JMException(e.getMessage());
- }
- }
-
-
- @Override
- public boolean getCoalescingSync() throws IOException, JMException
- {
- return _store.isCoalescingSync();
- }
-
- @Override
- public String getNodeState() throws IOException, JMException
- {
- try
- {
- return _store.getNodeState();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query node state", e);
- throw new JMException(e.getMessage());
- }
- }
-
- @Override
- public boolean getDesignatedPrimary() throws IOException, JMException
- {
- try
- {
- return _store.isDesignatedPrimary();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query designated primary", e);
- throw new JMException(e.getMessage());
- }
- }
-
- @Override
- public TabularData getAllNodesInGroup() throws IOException, JMException
- {
- final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _store.getGroupMembers();
-
- for (Map<String, String> map : members)
- {
- CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
- data.put(memberData);
- }
- return data;
- }
-
- @Override
- public void removeNodeFromGroup(String nodeName) throws JMException
- {
- try
- {
- _store.removeNodeFromGroup(nodeName);
- }
- catch (AMQStoreException e)
- {
- LOGGER.error("Failed to remove node " + nodeName + " from group", e);
- throw new JMException(e.getMessage());
- }
- }
-
- @Override
- public void setDesignatedPrimary(boolean primary) throws JMException
- {
- try
- {
- _store.setDesignatedPrimary(primary);
- }
- catch (AMQStoreException e)
- {
- LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
- throw new JMException(e.getMessage());
- }
- }
-
- @Override
- public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
- {
- try
- {
- _store.updateAddress(nodeName, newHostName, newPort);
- }
- catch(AMQStoreException e)
- {
- LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
- throw new JMException(e.getMessage());
- }
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index d5bf5374bc..82bc3d8564 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -42,6 +42,7 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
+ private static final String BDB_STORE_TYPE = "BDB";
private CommitThreadWrapper _commitThreadWrapper;
@Override
@@ -103,4 +104,11 @@ public class BDBMessageStore extends AbstractBDBMessageStore
return _commitThreadWrapper.commit(tx, syncCommit);
}
+
+ @Override
+ public String getStoreType()
+ {
+ return BDB_STORE_TYPE;
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
deleted file mode 100644
index 6499ea04e0..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.IOException;
-
-import javax.management.JMException;
-import javax.management.openmbean.TabularData;
-
-import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
-
-public interface ManagedBDBHAMessageStore
-{
- public static final String TYPE = "BDBHAMessageStore";
-
- public static final String ATTR_GROUP_NAME = "GroupName";
- public static final String ATTR_NODE_NAME = "NodeName";
- public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
- public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
- public static final String ATTR_DURABILITY = "Durability";
- public static final String ATTR_NODE_STATE = "NodeState";
- public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
- public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
-
- @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
- String getGroupName() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group")
- String getNodeName() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group")
- String getNodeHostPort() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node")
- String getNodeState() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
- String getHelperHostPort() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_DURABILITY, description="Durability")
- String getDurability() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
- boolean getDesignatedPrimary() throws IOException, JMException;
-
- @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.")
- boolean getCoalescingSync() throws IOException, JMException;
-
- @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
- TabularData getAllNodesInGroup() throws IOException, JMException;
-
- @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
- void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException;
-
- @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.")
- void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException;
-
- @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.")
- void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName,
- @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName,
- @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException;
-}
-
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 3265fb6823..97a3d61df1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.util.MapJsonSerializer;
@@ -93,6 +94,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
private MapJsonSerializer _serializer = new MapJsonSerializer();
+ private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString()));
+
/**
* Upgrades from a v5 database to a v6 database
*
@@ -554,17 +557,49 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive,
FieldTable arguments)
{
+ Map<String, Object> attributesMap = buildQueueArgumentMap(queueName,
+ owner, exclusive, arguments);
+ String json = _serializer.serialize(attributesMap);
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json);
+ return configuredObject;
+ }
+
+ private Map<String, Object> buildQueueArgumentMap(String queueName,
+ String owner, boolean exclusive, FieldTable arguments)
+ {
+
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Queue.NAME, queueName);
- attributesMap.put(Queue.OWNER, owner);
attributesMap.put(Queue.EXCLUSIVE, exclusive);
+
+ FieldTable argumentsCopy = new FieldTable();
if (arguments != null)
{
- attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments));
+ argumentsCopy.addAll(arguments);
}
- String json = _serializer.serialize(attributesMap);
- UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json);
- return configuredObject;
+
+ if (moveNonExclusiveOwnerToDescription(owner, exclusive))
+ {
+ _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION);
+
+ attributesMap.put(Queue.OWNER, null);
+ argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner);
+ }
+ else
+ {
+ attributesMap.put(Queue.OWNER, owner);
+ }
+ if (!argumentsCopy.isEmpty())
+ {
+ attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy));
+ }
+ return attributesMap;
+ }
+
+ private boolean moveNonExclusiveOwnerToDescription(String owner,
+ boolean exclusive)
+ {
+ return exclusive == false && owner != null && _moveNonExclusiveQueueOwnerToDescription;
}
private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType,