summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java57
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java12
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java49
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java1867
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java665
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java85
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java1644
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java313
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (renamed from qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java)40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java58
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java)17
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java37
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java228
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java1052
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java152
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java26
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java170
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java14
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java128
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java208
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java336
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java70
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java37
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java40
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java8
31 files changed, 4651 insertions, 2866 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index aa4ddd8181..f36c1ecc6f 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -36,19 +36,12 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* Management mbean for BDB HA.
- * <p>
- * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is
- * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the
- * same classloader as broker-plugins-management-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF
- * of this bundle.
- * </p>
*/
public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
{
@@ -63,7 +56,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
try
{
GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
itemNames,
@@ -71,7 +64,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
GROUP_MEMBER_ATTRIBUTE_TYPES );
GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
GROUP_MEMBER_ROW,
- new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
}
catch (final OpenDataException ode)
{
@@ -79,44 +72,46 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
- private final BDBHAMessageStore _store;
+ private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final String _objectName;
- protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException
+ protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
- LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
- _store = store;
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
+ _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ _objectName = ObjectName.quote(virtualHostName);
register();
}
@Override
public String getObjectInstanceName()
{
- return ObjectName.quote(_store.getName());
+ return _objectName;
}
@Override
public String getGroupName()
{
- return _store.getGroupName();
+ return _replicatedEnvironmentFacade.getGroupName();
}
@Override
public String getNodeName()
{
- return _store.getNodeName();
+ return _replicatedEnvironmentFacade.getNodeName();
}
@Override
public String getNodeHostPort()
{
- return _store.getNodeHostPort();
+ return _replicatedEnvironmentFacade.getHostPort();
}
@Override
public String getHelperHostPort()
{
- return _store.getHelperHostPort();
+ return _replicatedEnvironmentFacade.getHelperHostPort();
}
@Override
@@ -124,7 +119,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.getDurability();
+ return _replicatedEnvironmentFacade.getDurability();
}
catch (RuntimeException e)
{
@@ -137,7 +132,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _store.isCoalescingSync();
+ return _replicatedEnvironmentFacade.isCoalescingSync();
}
@Override
@@ -145,7 +140,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.getNodeState();
+ return _replicatedEnvironmentFacade.getNodeState();
}
catch (RuntimeException e)
{
@@ -159,7 +154,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.isDesignatedPrimary();
+ return _replicatedEnvironmentFacade.isDesignatedPrimary();
}
catch (RuntimeException e)
{
@@ -172,7 +167,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
public TabularData getAllNodesInGroup() throws IOException, JMException
{
final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _store.getGroupMembers();
+ final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
for (Map<String, String> map : members)
{
@@ -187,9 +182,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.removeNodeFromGroup(nodeName);
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
}
- catch (StoreException e)
+ catch (RuntimeException e)
{
LOGGER.error("Failed to remove node " + nodeName + " from group", e);
throw new JMException(e.getMessage());
@@ -201,11 +196,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.setDesignatedPrimary(primary);
+ _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
}
- catch (StoreException e)
+ catch (RuntimeException e)
{
- LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
throw new JMException(e.getMessage());
}
}
@@ -215,9 +210,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.updateAddress(nodeName, newHostName, newPort);
+ _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
}
- catch(StoreException e)
+ catch(RuntimeException e)
{
LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
throw new JMException(e.getMessage());
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 0492350a25..16199d30a3 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#TYPE}.
+ * host and of type {@link ReplicatedEnvironmentFacade#TYPE}.
*
*/
public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
public boolean isChildManageableByMBean(ConfiguredObject child)
{
return (child instanceof VirtualHost
- && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
}
@Override
@@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
{
VirtualHost virtualHostChild = (VirtualHost) child;
- BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
+ BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating mBean for child " + child);
}
- return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
+ return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent);
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 0d963ebdae..fa16d1061a 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -37,10 +37,9 @@ import javax.management.openmbean.TabularData;
import junit.framework.TestCase;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
@@ -53,7 +52,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
- private BDBHAMessageStore _store;
+ private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
private BDBHAMessageStoreManagerMBean _mBean;
private AMQManagedObject _mBeanParent;
@@ -62,10 +61,10 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
super.setUp();
- _store = mock(BDBHAMessageStore.class);
+ _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
- _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+ _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade, _mBeanParent);
}
@Override
@@ -76,64 +75,62 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testObjectName() throws Exception
{
- when(_store.getName()).thenReturn(TEST_STORE_NAME);
-
String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
}
public void testGroupName() throws Exception
{
- when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME);
assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
}
public void testNodeName() throws Exception
{
- when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+ when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME);
assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
}
public void testNodeHostPort() throws Exception
{
- when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+ when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT);
assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
}
public void testHelperHostPort() throws Exception
{
- when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+ when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
public void testDurability() throws Exception
{
- when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+ when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY);
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
public void testCoalescingSync() throws Exception
{
- when(_store.isCoalescingSync()).thenReturn(true);
+ when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true);
assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
{
- when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+ when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE);
assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
}
public void testDesignatedPrimaryFlag() throws Exception
{
- when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+ when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
}
@@ -141,29 +138,29 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testGroupMembersForGroupWithOneNode() throws Exception
{
List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
- when(_store.getGroupMembers()).thenReturn(members);
+ when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members);
final TabularData resultsTable = _mBean.getAllNodesInGroup();
- assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+ assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
final int numberOfDataRows = resultsTable.size();
assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
- assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
- assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
}
public void testRemoveNodeFromReplicationGroup() throws Exception
{
_mBean.removeNodeFromGroup(TEST_NODE_NAME);
- verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
}
public void testRemoveNodeFromReplicationGroupWithError() throws Exception
{
- doThrow(new StoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
try
{
@@ -180,12 +177,12 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
_mBean.setDesignatedPrimary(true);
- verify(_store).setDesignatedPrimary(true);
+ verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
}
public void testSetAsDesignatedPrimaryWithError() throws Exception
{
- doThrow(new StoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+ doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
try
{
@@ -205,7 +202,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
_mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
- verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort);
}
private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -220,8 +217,8 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private Map<String, String> createTestNodeResult()
{
Map<String, String> items = new HashMap<String, String>();
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
return items;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
deleted file mode 100644
index 37fb77f547..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ /dev/null
@@ -1,1867 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.*;
-import com.sleepycat.je.Transaction;
-
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
-import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import org.apache.qpid.util.FileUtils;
-
-public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore
-{
- private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class);
-
- private static final int LOCK_RETRY_ATTEMPTS = 5;
-
- public static final int VERSION = 7;
-
- private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
- {{
- put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
- put(EnvironmentConfig.STATS_COLLECT, "false"); // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84
- }});
-
- private final AtomicBoolean _closed = new AtomicBoolean(false);
-
- private Environment _environment;
-
- private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
- private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
- private static String BRIDGEDB_NAME = "BRIDGES";
- private static String LINKDB_NAME = "LINKS";
- private static String XIDDB_NAME = "XIDS";
- private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
-
- private Database _configuredObjectsDb;
- private Database _configVersionDb;
- private Database _messageMetaDataDb;
- private Database _messageContentDb;
- private Database _deliveryDb;
- private Database _bridgeDb;
- private Database _linkDb;
- private Database _xidDb;
-
- /* =======
- * Schema:
- * =======
- *
- * Queue:
- * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
- * arguments(FieldTable encoded as binary), exclusive (boolean)
- *
- * Exchange:
- * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
- *
- * Binding:
- * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
- * arguments (FieldTable encoded as binary) - 0 (zero)
- *
- * QueueEntry:
- * queueName(AMQShortString), messageId (long) - 0 (zero)
- *
- * Message (MetaData):
- * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
- *
- * Message (Content):
- * messageId (long), byteOffset (integer) - dataLength(integer), data(binary)
- */
-
- private final AtomicLong _messageId = new AtomicLong(0);
-
- protected final StateManager _stateManager;
-
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
- private ConfigurationRecoveryHandler _configRecoveryHandler;
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- private final EventManager _eventManager = new EventManager();
- private String _storeLocation;
-
- private Map<String, String> _envConfigMap;
- private VirtualHost _virtualHost;
-
- public AbstractBDBMessageStore()
- {
- _stateManager = new StateManager(_eventManager);
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
- {
- _stateManager.attainState(State.INITIALISING);
-
- _configRecoveryHandler = recoveryHandler;
- _virtualHost = virtualHost;
- }
-
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
- if(_stateManager.isInState(State.INITIAL))
- {
- // Is acting as a message store, but not a durable config store
- _stateManager.attainState(State.INITIALISING);
- }
-
- _messageRecoveryHandler = messageRecoveryHandler;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _virtualHost = virtualHost;
-
- completeInitialisation();
- }
-
- private void completeInitialisation()
- {
- configure(_virtualHost);
-
- _stateManager.attainState(State.INITIALISED);
- }
-
- public synchronized void activate()
- {
- // check if acting as a durable config store, but not a message store
- if(_stateManager.isInState(State.INITIALISING))
- {
- completeInitialisation();
- }
- _stateManager.attainState(State.ACTIVATING);
-
- if(_configRecoveryHandler != null)
- {
- recoverConfig(_configRecoveryHandler);
- }
- if(_messageRecoveryHandler != null)
- {
- recoverMessages(_messageRecoveryHandler);
- }
- if(_tlogRecoveryHandler != null)
- {
- recoverQueueEntries(_tlogRecoveryHandler);
- }
-
- _stateManager.attainState(State.ACTIVE);
- }
-
- public org.apache.qpid.server.store.Transaction newTransaction()
- {
- return new BDBTransaction();
- }
-
- /**
- * Called after instantiation in order to configure the message store.
- *
- *
- *
- * @param virtualHost The virtual host using this store
- * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
- *
- * @throws Exception If any error occurs that means the store is unable to configure itself.
- */
- public void configure(VirtualHost virtualHost)
- {
- configure(virtualHost, _messageRecoveryHandler != null);
- }
-
- public void configure(VirtualHost virtualHost, boolean isMessageStore)
- {
- String name = virtualHost.getName();
- final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
-
- String storeLocation;
- if(isMessageStore)
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
- else // we are acting only as the durable config store
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
-
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
-
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- File environmentPath = new File(storeLocation);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- _storeLocation = storeLocation;
-
- _envConfigMap = new HashMap<String, String>();
- _envConfigMap.putAll(ENVCONFIG_DEFAULTS);
-
- Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
- if(bdbEnvConfigAttr instanceof Map)
- {
- _envConfigMap.putAll((Map)bdbEnvConfigAttr);
- }
-
- LOGGER.info("Configuring BDB message store");
-
- setupStore(environmentPath, name);
- }
-
- protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException
- {
- final List<Object> argumentNames = config.getList(prefix + ".name");
- final List<Object> argumentValues = config.getList(prefix + ".value");
- final int initialSize = argumentNames.size() + defaultConfig.size();
-
- final Map<String,String> attributes = new HashMap<String,String>(initialSize);
- attributes.putAll(defaultConfig);
-
- for (int i = 0; i < argumentNames.size(); i++)
- {
- final String argName = argumentNames.get(i).toString();
- final String argValue = argumentValues.get(i).toString();
-
- attributes.put(argName, argValue);
- }
-
- return Collections.unmodifiableMap(attributes);
- }
-
- @Override
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- /**
- * Move the store state from INITIAL to ACTIVE without actually recovering.
- *
- * This is required if you do not want to perform recovery of the store data
- *
- * @throws org.apache.qpid.server.store.StoreException if the store is not in the correct state
- */
- void startWithNoRecover() throws StoreException
- {
- _stateManager.attainState(State.INITIALISING);
- _stateManager.attainState(State.INITIALISED);
- _stateManager.attainState(State.ACTIVATING);
- _stateManager.attainState(State.ACTIVE);
- }
-
- protected void setupStore(File storePath, String name)
- {
- _environment = createEnvironment(storePath);
-
- new Upgrader(_environment, name).upgradeIfNecessary();
-
- openDatabases();
-
- _totalStoreSize = getSizeOnDisk();
- }
-
- protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException;
-
- public Environment getEnvironment()
- {
- return _environment;
- }
-
- private void openDatabases() throws DatabaseException
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
-
- //This is required if we are wanting read only access.
- dbConfig.setReadOnly(false);
-
- _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
- _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
- _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
- _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
- _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
- _linkDb = openDatabase(LINKDB_NAME, dbConfig);
- _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig);
- _xidDb = openDatabase(XIDDB_NAME, dbConfig);
- }
-
- private Database openDatabase(final String dbName, final DatabaseConfig dbConfig)
- {
- // if opening read-only and the database doesn't exist, then you can't create it
- return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName)
- ? null
- : _environment.openDatabase(null, dbName, dbConfig);
- }
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
- public void close()
- {
- if (_closed.compareAndSet(false, true))
- {
- _stateManager.attainState(State.CLOSING);
- closeInternal();
- _stateManager.attainState(State.CLOSED);
- }
- }
-
- protected void closeInternal()
- {
- if (_messageMetaDataDb != null)
- {
- LOGGER.info("Closing message metadata database");
- _messageMetaDataDb.close();
- }
-
- if (_messageContentDb != null)
- {
- LOGGER.info("Closing message content database");
- _messageContentDb.close();
- }
-
- if (_configuredObjectsDb != null)
- {
- LOGGER.info("Closing configurable objects database");
- _configuredObjectsDb.close();
- }
-
- if (_deliveryDb != null)
- {
- LOGGER.info("Close delivery database");
- _deliveryDb.close();
- }
-
- if (_bridgeDb != null)
- {
- LOGGER.info("Close bridge database");
- _bridgeDb.close();
- }
-
- if (_linkDb != null)
- {
- LOGGER.info("Close link database");
- _linkDb.close();
- }
-
-
- if (_xidDb != null)
- {
- LOGGER.info("Close xid database");
- _xidDb.close();
- }
-
-
- if (_configVersionDb != null)
- {
- LOGGER.info("Close config version database");
- _configVersionDb.close();
- }
-
- closeEnvironment();
-
- }
-
- private void closeEnvironment() throws DatabaseException
- {
- if (_environment != null)
- {
- // Clean the log before closing. This makes sure it doesn't contain
- // redundant data. Closing without doing this means the cleaner may not
- // get a chance to finish.
- try
- {
- _environment.cleanLog();
- }
- finally
- {
- _environment.close();
- }
- }
- }
-
-
- private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler)
- {
- try
- {
- final int configVersion = getConfigVersion();
- recoveryHandler.beginConfigurationRecovery(this, configVersion);
- loadConfiguredObjects(recoveryHandler);
-
- final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
- if(newConfigVersion != configVersion)
- {
- updateConfigVersion(newConfigVersion);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
- private void updateConfigVersion(int newConfigVersion) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- Transaction txn = _environment.beginTransaction(null, null);
- cursor = _configVersionDb.openCursor(txn, null);
- DatabaseEntry key = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0,key);
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- IntegerBinding.intToEntry(newConfigVersion, value);
- OperationStatus status = cursor.put(key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error setting config version: " + status);
- }
- }
- cursor.close();
- cursor = null;
- txn.commit();
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private int getConfigVersion() throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _configVersionDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- return IntegerBinding.entryToInt(value);
- }
-
- // Insert 0 as the default config version
- IntegerBinding.intToEntry(0,value);
- ByteBinding.byteToEntry((byte) 0,key);
- OperationStatus status = _configVersionDb.put(null, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error initialising config version: " + status);
- }
- return 0;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
- {
- Cursor cursor = null;
- try
- {
- cursor = _configuredObjectsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
- crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
- }
-
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void closeCursorSafely(Cursor cursor)
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
-
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = _messageMetaDataDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws DatabaseException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- qerh.queueEntry(queueId, messageId);
- }
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
- cursor = null;
- try
- {
- cursor = _xidDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
- }
-
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
-
- public void removeMessage(long messageId, boolean sync) throws StoreException
- {
-
- boolean complete = false;
- com.sleepycat.je.Transaction tx = null;
-
- Random rand = null;
- int attempts = 0;
- try
- {
- do
- {
- tx = null;
- try
- {
- tx = _environment.beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- _messageContentDb.delete(tx, contentKeyEntry);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted content for message " + messageId);
- }
-
- commit(tx, sync);
- complete = true;
- tx = null;
- }
- catch (LockConflictException e)
- {
- try
- {
- if(tx != null)
- {
- tx.abort();
- }
- }
- catch(DatabaseException e2)
- {
- LOGGER.warn("Unable to abort transaction after LockConflictExcption", e2);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw e;
- }
-
-
- LOGGER.warn("Lock timeout exception. Retrying (attempt "
- + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw e;
- }
- }
- }
- while(!complete);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Unexpected BDB exception", e);
-
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new StoreException("Error aborting transaction " + e1, e1);
- }
- }
-
- throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new StoreException("Error aborting transaction " + e1, e1);
- }
- }
- }
- }
-
- @Override
- public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
- storeConfiguredObjectEntry(configuredObject);
- }
- }
-
- @Override
- public void remove(UUID id, String type) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
- }
- OperationStatus status = removeConfiguredObject(null, id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Configured object of type " + type + " with id " + id + " not found");
- }
- }
-
- @Override
- public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- for(UUID id : objects)
- {
- if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS)
- {
- removed.add(id);
- }
- }
-
- txn.commit();
- return removed.toArray(new UUID[removed.size()]);
- }
-
- @Override
- public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
- {
- update(false, id, type, attributes, null);
- }
-
- public void update(ConfiguredObjectRecord... records) throws StoreException
- {
- update(false, records);
- }
-
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- for(ConfiguredObjectRecord record : records)
- {
- update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn);
- }
- txn.commit();
- }
-
- private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws
- StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Updating " +type + ", id: " + id);
- }
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(id, key);
-
- DatabaseEntry value = new DatabaseEntry();
- DatabaseEntry newValue = new DatabaseEntry();
- ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
-
- OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND))
- {
- ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
-
- // write the updated entry to the store
- configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
- status = _configuredObjectsDb.put(txn, key, newValue);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error updating queue details within the store: " + status);
- }
- }
- else if (status != OperationStatus.NOTFOUND)
- {
- throw new StoreException("Error finding queue details within the store: " + status);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error updating queue details within the store: " + e,e);
- }
- }
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " in transaction " + tx);
- }
- _deliveryDb.put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
- keyBinding.objectToEntry(queueEntryKey, key);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
-
- try
- {
-
- OperationStatus status = _deliveryDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removed message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
- + " from delivery db");
-
- }
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
- LOGGER.error(tx);
-
- throw new StoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
-
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
- keyBinding.objectToEntry(xid,key);
-
- DatabaseEntry value = new DatabaseEntry();
- PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- valueBinding.objectToEntry(preparedTransaction, value);
-
- try
- {
- _xidDb.put(txn, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to write xid: " + e.getMessage(), e);
- throw new StoreException("Error writing xid to database", e);
- }
- }
-
- private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
- throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
-
- keyBinding.objectToEntry(xid, key);
-
-
- try
- {
-
- OperationStatus status = _xidDb.delete(txn, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find xid");
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove xid");
- }
-
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to remove xid ", e);
- LOGGER.error(txn);
-
- throw new StoreException("Error accessing database while removing xid: " + e.getMessage(), e);
- }
- }
-
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws
- StoreException
- {
- if (tx == null)
- {
- throw new StoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- StoreFuture result;
- try
- {
- result = commit(tx, syncCommit);
-
- if (LOGGER.isDebugEnabled())
- {
- String transactionType = syncCommit ? "synchronous" : "asynchronous";
- LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error commit tx: " + e.getMessage(), e);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("abortTran called for transaction " + tx);
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error aborting transaction: " + e.getMessage(), e);
- }
- }
-
- /**
- * Primarily for testing purposes.
- *
- * @param queueId
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error closing cursor: " + e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- public long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
- * @param contentBody The content of the data chunk.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
- ByteBuffer contentBody) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
- try
- {
- OperationStatus status = _messageContentDb.put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error adding content for message id " + messageId + ": " + status);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
-
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
- StorableMessageMetaData messageMetaData)
- throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("storeMetaData called for transaction " + tx
- + ", messageId " + messageId
- + ", messageMetaData " + messageMetaData);
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- _messageMetaDataDb.put(tx, key, value);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
-
- try
- {
- OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
-
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- try
- {
-
- int written = 0;
- OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
- if (offset > size)
- {
- throw new StoreException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
-
- dst.put(dataAsBytes, offset, written);
- }
- return written;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- @SuppressWarnings("unchecked")
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- if(metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNewMessageId(), metaData);
- }
- }
-
- //Package getters for the various databases used by the Store
-
- Database getMetaDataDb()
- {
- return _messageMetaDataDb;
- }
-
- Database getContentDb()
- {
- return _messageContentDb;
- }
-
- Database getDeliveryDb()
- {
- return _deliveryDb;
- }
-
- /**
- * Makes the specified configured object persistent.
- *
- * @param configuredObject Details of the configured object to store.
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- LOGGER.debug("Storing configured object: " + configuredObject);
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(configuredObject.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
-
- queueBinding.objectToEntry(configuredObject, value);
- try
- {
- OperationStatus status = _configuredObjectsDb.put(null, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " to database: "
- + status);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing configured object " + configuredObject
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException
- {
-
- LOGGER.debug("Removing configured object: " + id);
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(id, key);
- try
- {
- return _configuredObjectsDb.delete(tx, key);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error deleting of configured object with id " + id + " from database", e);
- }
- }
-
- protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException;
-
-
- private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
- {
-
- private final long _messageId;
- private final boolean _isRecovered;
-
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, false);
- }
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
- {
- _messageId = messageId;
- _isRecovered = isRecovered;
-
- if(!_isRecovered)
- {
- _metaData = metaData;
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaDataRef.get();
- if(metaData == null)
- {
- metaData = AbstractBDBMessageStore.this.getMessageMetaData(_messageId);
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- }
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.limit(length);
- buf.position(0);
- return buf;
- }
- }
-
- synchronized void store(com.sleepycat.je.Transaction txn)
- {
- if (!stored())
- {
- try
- {
- _dataRef = new SoftReference<byte[]>(_data);
- AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- catch(DatabaseException e)
- {
- throw new StoreException(e);
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
- }
- }
-
- public synchronized StoreFuture flushToStore()
- {
- if(!stored())
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- store(txn);
- AbstractBDBMessageStore.this.commit(txn,true);
- storedSizeChange(getMetaData().getContentSize());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void remove()
- {
- int delta = getMetaData().getContentSize();
- AbstractBDBMessageStore.this.removeMessage(_messageId, false);
- storedSizeChange(-delta);
- }
-
- private boolean stored()
- {
- return _metaData == null || _isRecovered;
- }
- }
-
- private class BDBTransaction implements org.apache.qpid.server.store.Transaction
- {
- private com.sleepycat.je.Transaction _txn;
- private int _storeSizeIncrease;
-
- private BDBTransaction()
- {
- try
- {
- _txn = _environment.beginTransaction(null, null);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Exception during transaction begin, closing store environment.", e);
- closeEnvironmentSafely();
-
- throw new StoreException("Exception during transaction begin, store environment closed.", e);
- }
- }
-
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- if(message.getStoredMessage() instanceof StoredBDBMessage)
- {
- final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- }
-
- AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void commitTran()
- {
- AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
- AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
- }
-
- public StoreFuture commitTranAsync()
- {
- AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
- return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- public void abortTran()
- {
- AbstractBDBMessageStore.this.abortTran(_txn);
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
-
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues)
- {
- AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
- }
- }
-
- private void storedSizeChange(final int delta)
- {
- if(getPersistentSizeHighThreshold() > 0)
- {
- synchronized (this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 2*delta;
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- _totalStoreSize = getSizeOnDisk();
-
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- _totalStoreSize = getSizeOnDisk();
-
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk();
-
- _totalStoreSize = getSizeOnDisk();
-
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- }
- }
-
- private void reduceSizeOnDisk()
- {
- _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
- boolean cleaned = false;
- while (_environment.cleanLog() > 0)
- {
- cleaned = true;
- }
- if (cleaned)
- {
- CheckpointConfig force = new CheckpointConfig();
- force.setForce(true);
- _environment.checkpoint(force);
- }
-
-
- _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
- }
-
- private long getSizeOnDisk()
- {
- return _environment.getStats(null).getTotalLogSize();
- }
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
- }
-
- private void setEnvironmentConfigProperties(EnvironmentConfig envConfig)
- {
- for (Map.Entry<String, String> configItem : _envConfigMap.entrySet())
- {
- LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
- envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
- }
- }
-
- protected EnvironmentConfig createEnvironmentConfig()
- {
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
-
- setEnvironmentConfigProperties(envConfig);
-
- envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
-
- return envConfig;
- }
-
- protected void closeEnvironmentSafely()
- {
- try
- {
- _environment.close();
- }
- catch (DatabaseException ex)
- {
- LOGGER.error("Exception closing store environment", ex);
- }
- catch (IllegalStateException ex)
- {
- LOGGER.error("Exception closing store environment", ex);
- }
- }
-
-
- private class LoggingAsyncExceptionListener implements ExceptionListener
- {
- @Override
- public void exceptionThrown(ExceptionEvent event)
- {
- LOGGER.error("Asynchronous exception thrown by BDB thread '"
- + event.getThreadName() + "'", event.getException());
- }
- }
-
- @Override
- public void onDelete()
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting store " + _storeLocation);
- }
-
- if (_storeLocation != null)
- {
- File location = new File(_storeLocation);
- if (location.exists())
- {
- if (!FileUtils.delete(location, true))
- {
- LOGGER.error("Cannot delete " + _storeLocation);
- }
- }
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
deleted file mode 100644
index d99733acf0..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.HAMessageStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.ReplicaAckPolicy;
-import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.OperationFailureException;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-import com.sleepycat.je.rep.ReplicationMutableConfig;
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
-
-import javax.security.auth.Subject;
-
-public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
-{
- private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
-
- private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY);
-
- public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
- public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
-
- @SuppressWarnings("serial")
- private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
- {{
- /**
- * Parameter decreased as the 24h default may lead very large log files for most users.
- */
- put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
- /**
- * Parameter increased as the 5 s default may lead to spurious timeouts.
- */
- put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
- /**
- * Parameter increased as the 10 s default may lead to spurious timeouts.
- */
- put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
- /**
- * Parameter increased as the 10 h default may cause user confusion.
- */
- put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
- /**
- * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
- * is scheduled to become default after JE 5.0.48.
- */
- put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
- /**
- * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
- * with NO_SYN durability in case if such Node crushes.
- */
- put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
- }});
-
- public static final String TYPE = "BDB-HA";
-
- private String _groupName;
- private String _nodeName;
- private String _nodeHostPort;
- private String _helperHostPort;
- private Durability _durability;
-
- private String _name;
-
- private CommitThreadWrapper _commitThreadWrapper;
- private boolean _coalescingSync;
- private boolean _designatedPrimary;
- private Map<String, String> _repConfig;
-
- @Override
- public void configure(VirtualHost virtualHost)
- {
- //Mandatory configuration
- _groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
- _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
- _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
- _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
- _name = virtualHost.getName();
-
- //Optional configuration
- String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null);
- if (durabilitySetting == null)
- {
- _durability = DEFAULT_DURABILITY;
- }
- else
- {
- _durability = Durability.parse(durabilitySetting);
- }
- _designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
- _coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
-
- _repConfig = new HashMap<String, String>(REPCONFIG_DEFAULTS);
- Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
- if(repConfigAttr instanceof Map)
- {
- _repConfig.putAll((Map)repConfigAttr);
- }
-
- if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
- {
- throw new StoreException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
- + "! Please set highAvailability.coalescingSync to false in store configuration.");
- }
-
- super.configure(virtualHost);
- }
-
-
- private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- return attrValue.toString();
- }
- else
- {
- throw new StoreException("BDB HA configuration key not found. Please specify configuration attribute: "
- + attributeName);
- }
- }
-
- private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- return attrValue.toString();
- }
- return defaultVal;
- }
-
- private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- if(attrValue instanceof Boolean)
- {
- return ((Boolean) attrValue).booleanValue();
- }
- else if(attrValue instanceof String)
- {
- return Boolean.parseBoolean((String)attrValue);
- }
-
- }
- return defaultVal;
- }
-
-
- @Override
- protected void setupStore(File storePath, String name) throws DatabaseException
- {
- super.setupStore(storePath, name);
-
- if(_coalescingSync)
- {
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
- _commitThreadWrapper.startCommitThread();
- }
- }
-
- @Override
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
- {
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Environment path " + environmentPath.getAbsolutePath());
- LOGGER.info("Group name " + _groupName);
- LOGGER.info("Node name " + _nodeName);
- LOGGER.info("Node host port " + _nodeHostPort);
- LOGGER.info("Helper host port " + _helperHostPort);
- LOGGER.info("Durability " + _durability);
- LOGGER.info("Coalescing sync " + _coalescingSync);
- LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
- }
-
- final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
-
- replicationConfig.setHelperHosts(_helperHostPort);
- replicationConfig.setDesignatedPrimary(_designatedPrimary);
- setReplicationConfigProperties(replicationConfig);
-
- final EnvironmentConfig envConfig = createEnvironmentConfig();
- envConfig.setDurability(_durability);
-
- ReplicatedEnvironment replicatedEnvironment = null;
- try
- {
- replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
- }
- catch (final InsufficientLogException ile)
- {
- LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
- NetworkRestore restore = new NetworkRestore();
- NetworkRestoreConfig config = new NetworkRestoreConfig();
- config.setRetainLogFiles(false);
- restore.execute(ile, config);
- replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
- }
-
- return replicatedEnvironment;
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
- super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
-
- final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
-
- replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
- }
-
- @Override
- public synchronized void activate()
- {
- // Before proceeding, perform a log flush with an fsync
- getEnvironment().flushLog(true);
-
- super.activate();
- }
-
- @Override
- public synchronized void passivate()
- {
- if (_stateManager.isNotInState(State.INITIALISED))
- {
- LOGGER.debug("Store becoming passive");
- _stateManager.attainState(State.INITIALISED);
- }
- }
-
- public String getName()
- {
- return _name;
- }
-
- public String getGroupName()
- {
- return _groupName;
- }
-
- public String getNodeName()
- {
- return _nodeName;
- }
-
- public String getNodeHostPort()
- {
- return _nodeHostPort;
- }
-
- public String getHelperHostPort()
- {
- return _helperHostPort;
- }
-
- public String getDurability()
- {
- return _durability.toString();
- }
-
- public boolean isCoalescingSync()
- {
- return _coalescingSync;
- }
-
- public String getNodeState()
- {
- ReplicatedEnvironment.State state = getReplicatedEnvironment().getState();
- return state.toString();
- }
-
- public Boolean isDesignatedPrimary()
- {
- return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
- }
-
- public List<Map<String, String>> getGroupMembers()
- {
- List<Map<String, String>> members = new ArrayList<Map<String,String>>();
-
- for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes())
- {
- Map<String, String> nodeMap = new HashMap<String, String>();
- nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName());
- nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
- members.add(nodeMap);
- }
-
- return members;
- }
-
- public void removeNodeFromGroup(String nodeName) throws StoreException
- {
- try
- {
- createReplicationGroupAdmin().removeMember(nodeName);
- }
- catch (OperationFailureException ofe)
- {
- throw new StoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
- }
- }
-
- public void setDesignatedPrimary(boolean isPrimary) throws StoreException
- {
- try
- {
- final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
- synchronized(replicatedEnvironment)
- {
- final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
- final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
- replicatedEnvironment.setRepMutableConfig(newConfig);
- }
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e);
- }
- }
-
- ReplicatedEnvironment getReplicatedEnvironment()
- {
- return (ReplicatedEnvironment)getEnvironment();
- }
-
- public void updateAddress(String nodeName, String newHostName, int newPort) throws StoreException
- {
- try
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
- catch (OperationFailureException ofe)
- {
- throw new StoreException("Failed to update address for '" + nodeName +
- "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to update address for '" + nodeName +
- "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e);
- }
- }
-
- @Override
- protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
- {
- // Using commit() instead of commitNoSync() for the HA store to allow
- // the HA durability configuration to influence resulting behaviour.
- try
- {
- tx.commit();
- }
- catch (DatabaseException de)
- {
- LOGGER.error("Got DatabaseException on commit, closing environment", de);
-
- closeEnvironmentSafely();
-
- throw de;
- }
-
- if(_coalescingSync)
- {
- return _commitThreadWrapper.commit(tx, syncCommit);
- }
- else
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
- }
-
- @Override
- protected void closeInternal()
- {
- substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
-
- try
- {
- if(_coalescingSync)
- {
- try
- {
- _commitThreadWrapper.stopCommitThread();
- }
- catch (InterruptedException e)
- {
- throw new StoreException(e);
- }
- }
- }
- finally
- {
- super.closeInternal();
- }
- }
-
- /**
- * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during
- * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change.
- */
- private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment)
- {
- LOGGER.debug("Substituting no-op state change listener for environment close");
- replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
- }
-
- private ReplicationGroupAdmin createReplicationGroupAdmin()
- {
- final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
- helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
-
- final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
- helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
-
- return new ReplicationGroupAdmin(_groupName, helpers);
- }
-
-
- private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
- {
- for (Map.Entry<String, String> configItem : _repConfig.entrySet())
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
- }
- replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
- }
- }
-
- private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
- {
- if (!config.containsKey(key))
- {
- throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
- + key.replace('.', '/'));
- }
- return config.getString(key);
- }
-
- private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
- {
- private final Executor _executor = Executors.newSingleThreadExecutor();
-
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent)
- {
- com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Received BDB event indicating transition to state " + state);
- }
-
- switch (state)
- {
- case MASTER:
- activateStoreAsync();
- break;
- case REPLICA:
- passivateStoreAsync();
- break;
- case DETACHED:
- LOGGER.error("BDB replicated node in detached state, therefore passivating.");
- passivateStoreAsync();
- break;
- case UNKNOWN:
- LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
- break;
- default:
- LOGGER.error("Unexpected state change: " + state);
- throw new IllegalStateException("Unexpected state change: " + state);
- }
- }
-
- /**
- * Calls {@link MessageStore#activate()}.
- *
- * <p/>
- *
- * This is done a background thread, in line with
- * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
- * activate may execute transactions, which can't complete until
- * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
- */
- private void activateStoreAsync()
- {
- String threadName = "BDBHANodeActivationThread-" + _name;
- executeStateChangeAsync(new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- try
- {
- activate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to activate on hearing MASTER change event",e);
- throw e;
- }
- return null;
- }
- }, threadName);
- }
-
- /**
- * Calls {@link #passivate()}.
- *
- * <p/>
- * This is done a background thread, in line with
- * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
- * passivation due to the effect of state change listeners.
- */
- private void passivateStoreAsync()
- {
- String threadName = "BDBHANodePassivationThread-" + _name;
- executeStateChangeAsync(new Callable<Void>()
- {
-
- @Override
- public Void call() throws Exception
- {
- try
- {
- passivate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e);
- throw e;
- }
- return null;
- }
- }, threadName);
- }
-
- private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
- {
-
- _executor.execute(new Runnable()
- {
-
- @Override
- public void run()
- {
- final String originalThreadName = Thread.currentThread().getName();
- Thread.currentThread().setName(threadName);
-
- try
- {
- Subject.doAs(SecurityManager.getSystemTaskSubject("BDB HA State Change"), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
-
- try
- {
- callable.call();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception during state change", e);
- }
- return null;
- }
- });
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
- }
- });
- }
- }
-
- private class NoOpStateChangeListener implements StateChangeListener
- {
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent)
- {
- }
- }
-
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index b055f8bd90..3fdc12ba31 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.store.berkeleydb;
*
*/
+import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -31,15 +32,22 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.OperationalLoggingListener;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
public class BDBHAVirtualHost extends AbstractVirtualHost
{
- private BDBHAMessageStore _messageStore;
+ private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
+
+ private BDBMessageStore _messageStore;
private boolean _inVhostInitiatedClose;
@@ -52,11 +60,9 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
}
-
-
protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
{
- _messageStore = new BDBHAMessageStore();
+ _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
final MessageStoreLogSubject storeLogSubject =
new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
@@ -84,6 +90,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
virtualHost, recoveryHandler,
recoveryHandler
);
+
+ // Make the virtualhost model object a replication group listener
+ ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
+ environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+
}
@@ -194,4 +205,70 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
}
}
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Received BDB event indicating transition to state " + state
+ + " when current message store state is " + _messageStore._stateManager.getState());
+ }
+
+ switch (state)
+ {
+ case MASTER:
+ activate();
+ break;
+ case REPLICA:
+ passivate();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivate();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ private void activate()
+ {
+ try
+ {
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ _messageStore.activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
+ }
+ }
+
+ private void passivate()
+ {
+ try
+ {
+ //TODO: move this this into the store method passivate()
+ if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+ {
+ _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index acff8e2b21..35dae4b800 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -20,16 +20,44 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.*;
+import com.sleepycat.je.Transaction;
+
import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.util.FileUtils;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -39,83 +67,1623 @@ import com.sleepycat.je.EnvironmentConfig;
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-public class BDBMessageStore extends AbstractBDBMessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
- public static final String TYPE = "BDB";
- private CommitThreadWrapper _commitThreadWrapper;
+
+ public static final int VERSION = 7;
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
+
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+ private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
+ private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XID_DB_NAME = "XIDS";
+ private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION";
+ private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME,
+ MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
+
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private EnvironmentFacade _environmentFacade;
+ private final AtomicLong _messageId = new AtomicLong(0);
+
+ protected final StateManager _stateManager;
+
+ private MessageStoreRecoveryHandler _messageRecoveryHandler;
+
+ private TransactionLogRecoveryHandler _tlogRecoveryHandler;
+
+ private ConfigurationRecoveryHandler _configRecoveryHandler;
+
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
+ private final EventManager _eventManager = new EventManager();
+ private final String _type;
+ private VirtualHost _virtualHost;
+
+ private final EnvironmentFacadeFactory _environmentFacadeFactory;
+
+ private volatile Committer _committer;
+
+ public BDBMessageStore()
+ {
+ this(new StandardEnvironmentFacadeFactory());
+ }
+
+ public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
+ {
+ _type = environmentFacadeFactory.getType();;
+ _environmentFacadeFactory = environmentFacadeFactory;
+ _stateManager = new StateManager(_eventManager);
+ }
@Override
- protected void setupStore(File storePath, String name) throws DatabaseException
+ public void addEventListener(EventListener eventListener, Event... events)
{
- super.setupStore(storePath, name);
+ _eventManager.addEventListener(eventListener, events);
+ }
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
- _commitThreadWrapper.startCommitThread();
+ @Override
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ {
+ _stateManager.attainState(State.INITIALISING);
+
+ _configRecoveryHandler = recoveryHandler;
+ _virtualHost = virtualHost;
+ }
+
+ @Override
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException
+ {
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ // Is acting as a message store, but not a durable config store
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ _messageRecoveryHandler = messageRecoveryHandler;
+ _tlogRecoveryHandler = tlogRecoveryHandler;
+ _virtualHost = virtualHost;
+
+
+ completeInitialisation();
}
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
+ private void completeInitialisation() throws StoreException
{
- LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = createEnvironmentConfig();
+ configure(_virtualHost, _messageRecoveryHandler != null);
+ _stateManager.attainState(State.INITIALISED);
+ }
+
+ private void startActivation() throws StoreException
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
try
{
- return new Environment(environmentPath, envConfig);
+ new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary();
+ _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
+ _totalStoreSize = getSizeOnDisk();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
}
- catch (DatabaseException de)
+
+ }
+
+ @Override
+ public synchronized void activate() throws StoreException
+ {
+ // check if acting as a durable config store, but not a message store
+ if(_stateManager.isInState(State.INITIALISING))
{
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
+ completeInitialisation();
+ }
+
+ _stateManager.attainState(State.ACTIVATING);
+ startActivation();
+
+ if(_configRecoveryHandler != null)
+ {
+ recoverConfig(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ recoverQueueEntries(_tlogRecoveryHandler);
+ }
+
+ _stateManager.attainState(State.ACTIVE);
+ }
+
+ @Override
+ public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
+ {
+ return new BDBTransaction();
+ }
+
+ private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
+ {
+ Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore);
+
+ _committer = _environmentFacade.createCommitter(virtualHost.getName());
+ _committer.start();
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ if (_environmentFacade == null)
+ {
+ return null;
+ }
+ return _environmentFacade.getStoreLocation();
+ }
+
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ return _environmentFacade;
+ }
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ @Override
+ public void close() throws StoreException
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ _stateManager.attainState(State.CLOSING);
+ try
{
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- return new Environment(environmentPath, envConfig);
+ try
+ {
+ _committer.stop();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
}
- else
+ catch(DatabaseException e)
+ {
+ throw new StoreException("Exception occured on message store close", e);
+ }
+ _stateManager.attainState(State.CLOSED);
+ }
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+
+ private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+ {
+ try
+ {
+ final int configVersion = getConfigVersion();
+ recoveryHandler.beginConfigurationRecovery(this, configVersion);
+ loadConfiguredObjects(recoveryHandler);
+
+ final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+
+ }
+
+ @SuppressWarnings("resource")
+ private void updateConfigVersion(int newConfigVersion) throws StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ cursor = getConfigVersionDb().openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0,key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ IntegerBinding.intToEntry(newConfigVersion, value);
+ OperationStatus status = cursor.put(key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error setting config version: " + status);
+ }
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ }
+
+ private int getConfigVersion() throws StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getConfigVersionDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ return IntegerBinding.entryToInt(value);
+ }
+
+ // Insert 0 as the default config version
+ IntegerBinding.intToEntry(0,value);
+ ByteBinding.byteToEntry((byte) 0,key);
+ OperationStatus status = getConfigVersionDb().put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error initialising config version: " + status);
+ }
+ return 0;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getConfiguredObjectsDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
+ crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
+ }
+
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ private void closeCursorSafely(Cursor cursor) throws StoreException
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+
+
+ private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
+ {
+ StoredMessageRecoveryHandler mrh = msrh.begin();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getMessageMetaDataDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ long maxId = 0;
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ mrh.message(message);
+
+ maxId = Math.max(maxId, messageId);
+ }
+
+ _messageId.set(maxId);
+ mrh.completeMessageRecovery();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+ throws StoreException
+ {
+ QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+ ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey qek = keyBinding.entryToObject(key);
+
+ entries.add(qek);
+ }
+
+ try
{
- throw de;
+ cursor.close();
+ }
+ finally
+ {
+ cursor = null;
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ qerh.queueEntry(queueId, messageId);
}
}
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
+
+ cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+ preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+
+ public void removeMessage(long messageId, boolean sync) throws StoreException
+ {
+
+ boolean complete = false;
+ com.sleepycat.je.Transaction tx = null;
+
+ Random rand = null;
+ int attempts = 0;
+ try
+ {
+ do
+ {
+ tx = null;
+ try
+ {
+ tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removing message id " + messageId);
+ }
+
+
+ OperationStatus status = getMessageMetaDataDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted metadata for message " + messageId);
+ }
+
+ //now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ getMessageContentDb().delete(tx, contentKeyEntry);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted content for message " + messageId);
+ }
+
+ _environmentFacade.commit(tx);
+ _committer.commit(tx, sync);
+
+ complete = true;
+ tx = null;
+ }
+ catch (LockConflictException e)
+ {
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
+ }
+
+
+ LOGGER.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
+ {
+ if(rand == null)
+ {
+ rand = new Random();
+ }
+
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
+
+ }
+ }
+ else
+ {
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
+ }
+ }
+ }
+ while(!complete);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Unexpected BDB exception", e);
+
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+
+ throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+ }
+ }
+
+ private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch (DatabaseException e1)
+ {
+ // We need the possible side effect of the handler restarting the environment but don't care about the exception
+ _environmentFacade.handleDatabaseException(null, e1);
+ LOGGER.warn(errorMessage, e1);
+ }
}
@Override
- protected void closeInternal()
+ public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
+ storeConfiguredObjectEntry(configuredObject);
+ }
+ }
+
+ @Override
+ public void remove(UUID id, String type) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
+ }
+ OperationStatus status = removeConfiguredObject(null, id);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Configured object of type " + type + " with id " + id + " not found");
+ }
+ }
+
+ @Override
+ public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
+ {
+ com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ for(UUID id : objects)
+ {
+ if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS)
+ {
+ removed.add(id);
+ }
+ }
+ commitTransaction(txn);
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private void commitTransaction(com.sleepycat.je.Transaction txn) throws StoreException
+ {
+ try
+ {
+ txn.commit();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot commit transaction on configured objects removal", e);
+ }
+ }
+
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
+ {
+ update(false, id, type, attributes, null);
+ }
+
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws StoreException
+ {
+ update(false, records);
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn);
+ }
+ commitTransaction(txn);
+ }
+
+ private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Updating " +type + ", id: " + id);
+ }
+
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(id, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ DatabaseEntry newValue = new DatabaseEntry();
+ ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
+
+ OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND))
+ {
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
+
+ // write the updated entry to the store
+ configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
+ status = getConfiguredObjectsDb().put(txn, key, newValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error updating configuration details within the store: " + status);
+ }
+ }
+ else if (status != OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Error finding configuration details within the store: " + status);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error updating configuration details within the store: " + e.getMessage(), txn);
+ }
+ throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
+ }
+ }
+
+ /**
+ * Places a message onto a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
+ keyBinding.objectToEntry(dd, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " in transaction " + tx);
+ }
+ getDeliveryDb().put(tx, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " to database", e);
+ }
+ }
+
+ /**
+ * Extracts a message from a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
+ UUID id = queue.getId();
+ keyBinding.objectToEntry(queueEntryKey, key);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Dequeue message id " + messageId + " from queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ }
+
+ try
+ {
+
+ OperationStatus status = getDeliveryDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find message with id " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue"
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removed message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
+ + " from delivery db");
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
+ }
+ }
+
+
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ org.apache.qpid.server.store.Transaction.Record[] enqueues,
+ org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
{
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+ keyBinding.objectToEntry(xid,key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ valueBinding.objectToEntry(preparedTransaction, value);
+
try
{
- _commitThreadWrapper.stopCommitThread();
+ getXidDb().put(txn, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Failed to write xid: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
}
- catch (InterruptedException e)
+ }
+
+ private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
+ throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+
+ keyBinding.objectToEntry(xid, key);
+
+
+ try
+ {
+
+ OperationStatus status = getXidDb().delete(txn, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find xid");
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove xid");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to remove xid in transaction " + txn, e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commits all operations performed within a given transaction.
+ *
+ * @param tx The transaction to commit all operations for.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
+ {
+ if (tx == null)
+ {
+ throw new StoreException("Fatal internal error: transactional is null at commitTran");
+ }
+
+ _environmentFacade.commit(tx);
+ StoreFuture result = _committer.commit(tx, syncCommit);
+
+ if (LOGGER.isDebugEnabled())
{
- throw new StoreException(e);
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
- super.closeInternal();
+ return result;
+ }
+
+ /**
+ * Abandons all operations performed within a given transaction.
+ *
+ * @param tx The transaction to abandon.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("abortTran called for transaction " + tx);
+ }
+
+ try
+ {
+ tx.abort();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Primarily for testing purposes.
+ *
+ * @param queueId
+ *
+ * @return a list of message ids for messages enqueued for a particular queue
+ */
+ List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+
+ DatabaseEntry key = new DatabaseEntry();
+
+ QueueEntryKey dd = new QueueEntryKey(queueId, 0);
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(dd, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ LinkedList<Long> messageIds = new LinkedList<Long>();
+
+ OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
+ dd = keyBinding.entryToObject(key);
+
+ while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
+ {
+
+ messageIds.add(dd.getMessageId());
+ status = cursor.getNext(key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ dd = keyBinding.entryToObject(key);
+ }
+ }
+
+ return messageIds;
+ }
+ catch (DatabaseException e)
+ {
+ throw new StoreException("Database error: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ /**
+ * Return a valid, currently unused message id.
+ *
+ * @return A fresh message id.
+ */
+ public long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param offset The offset of the data chunk in the message.
+ * @param contentBody The content of the data chunk.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
+ ByteBuffer contentBody) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
+ try
+ {
+ OperationStatus status = getMessageContentDb().put(tx, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error adding content for message id " + messageId + ": " + status);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stores message meta-data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
+ throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+ messageBinding.objectToEntry(messageMetaData, value);
+ try
+ {
+ getMessageMetaDataDb().put(tx, key, value);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ + messageId + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+
+ try
+ {
+ OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Metadata not found for message with id " + messageId);
+ }
+
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
+
+ return mdd;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
+ *
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
+ *
+ * @return The number of bytes inserted into the destination
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
+ }
+
+ try
+ {
+
+ int written = 0;
+ OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
+ {
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+ return written;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
}
@Override
- protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
+ @SuppressWarnings("unchecked")
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(getNewMessageId(), metaData);
+ }
+ }
+
+ /**
+ * Makes the specified configured object persistent.
+ *
+ * @param configuredObject Details of the configured object to store.
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ LOGGER.debug("Storing configured object: " + configuredObject);
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(configuredObject.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = getConfiguredObjectsDb().put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException
+ {
+
+ LOGGER.debug("Removing configured object: " + id);
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(id, key);
+ try
+ {
+ return getConfiguredObjectsDb().delete(tx, key);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error deleting of configured object with id " + id + " from database", e);
+ }
+ }
+
+
+
+ private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+ }
+ else
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
+ buf.position(0);
+ return buf;
+ }
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+ if (!stored())
+ {
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ public synchronized StoreFuture flushToStore()
+ {
+ if(!stored())
+ {
+ com.sleepycat.je.Transaction txn;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(
+ null, null);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
+ }
+ store(txn);
+ _environmentFacade.commit(txn);
+ _committer.commit(txn, true);
+
+ storedSizeChangeOccured(getMetaData().getContentSize());
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ BDBMessageStore.this.removeMessage(_messageId, false);
+ storedSizeChangeOccured(-delta);
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+ }
+
+ private class BDBTransaction implements org.apache.qpid.server.store.Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
+
+ private BDBTransaction() throws StoreException
+ {
+ try
+ {
+ _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
+ }
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void commitTran() throws StoreException
+ {
+ BDBMessageStore.this.commitTranImpl(_txn, true);
+ BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
+ }
+
+ public StoreFuture commitTranAsync() throws StoreException
+ {
+ BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
+ }
+
+ public void abortTran() throws StoreException
+ {
+ BDBMessageStore.this.abortTran(_txn);
+ }
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ {
+ BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
+ Record[] dequeues) throws StoreException
+ {
+ BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+ private void storedSizeChangeOccured(final int delta) throws StoreException
{
try
{
- tx.commitNoSync();
+ storedSizeChange(delta);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
}
- catch(DatabaseException de)
+ }
+
+ private void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
{
- LOGGER.error("Got DatabaseException on commit, closing environment", de);
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
- closeEnvironmentSafely();
+ _totalStoreSize = getSizeOnDisk();
- throw de;
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk()
+ {
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environmentFacade.getEnvironment().cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environmentFacade.getEnvironment().checkpoint(force);
}
- return _commitThreadWrapper.commit(tx, syncCommit);
+
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+
+ @Override
+ public void onDelete()
+ {
+ String storeLocation = getStoreLocation();
+
+ if (storeLocation != null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + storeLocation);
+ }
+
+ File location = new File(storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ LOGGER.error("Cannot delete " + storeLocation);
+ }
+ }
+ }
}
@Override
public String getStoreType()
{
- return TYPE;
+ return _type;
+ }
+
+ private Database getMessageContentDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
+ }
+
+ private Database getConfiguredObjectsDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
+ }
+
+ private Database getConfigVersionDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIG_VERSION_DB_NAME);
+ }
+
+ private Database getMessageMetaDataDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
+ }
+
+ private Database getDeliveryDb()
+ {
+ return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
+ }
+
+ private Database getXidDb()
+ {
+ return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index d7c8b23d39..4abe81c56c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
@@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public String getType()
{
- return BDBMessageStore.TYPE;
+ return StandardEnvironmentFacade.TYPE;
}
@Override
@@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
if(initialSize != 0)
{
- return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes);
+ return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes);
}
else
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
new file mode 100644
index 0000000000..a137e38baf
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class CoalescingCommiter implements Committer
+{
+ private final CommitThread _commitThread;
+
+ public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+ {
+ _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
+ }
+
+ @Override
+ public void start()
+ {
+ _commitThread.start();
+ }
+
+ @Override
+ public void stop()
+ {
+ _commitThread.close();
+ try
+ {
+ _commitThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Commit thread has not shutdown", ie);
+ }
+ }
+
+ @Override
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final Transaction _tx;
+ private final boolean _syncCommit;
+ private RuntimeException _databaseException;
+ private boolean _complete;
+
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ {
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("complete() called for transaction " + _tx);
+ }
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(RuntimeException databaseException)
+ {
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ _commitThread.addJob(this, _syncCommit);
+
+ if(!_syncCommit)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ }
+ return;
+ }
+
+ waitForCompletion();
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public synchronized void waitForCompletion()
+ {
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
+ while (!isComplete())
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
+ */
+ private static class CommitThread extends Thread
+ {
+ private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
+
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Object _lock = new Object();
+ private final EnvironmentFacade _environmentFacade;
+
+ public CommitThread(String name, EnvironmentFacade environmentFacade)
+ {
+ super(name);
+ _environmentFacade = environmentFacade;
+ }
+
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ int size = _jobQueue.size();
+
+ try
+ {
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
+ Environment environment = _environmentFacade.getEnvironment();
+ if (environment != null && environment.isValid())
+ {
+ environment.flushLog(true);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("flushLog completed in " + duration + " ms");
+ }
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.complete();
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ try
+ {
+ LOGGER.error("Exception during environment log flush", e);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
+ }
+ finally
+ {
+ LOGGER.error("Closing store environment", e);
+
+ try
+ {
+ _environmentFacade.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+ }
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit, final boolean sync)
+ {
+ if (_stopped.get())
+ {
+ throw new IllegalStateException("Commit thread is stopped");
+ }
+ _jobQueue.add(commit);
+ if(sync)
+ {
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
+ }
+
+ public void close()
+ {
+ RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ BDBCommitFuture commit = null;
+ while ((commit = _jobQueue.poll()) != null)
+ {
+ commit.abort(e);
+ }
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 7f7b65f315..36ee2ad306 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,27 +20,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.server.store.StoreFuture;
-import static org.mockito.Mockito.mock;
+import com.sleepycat.je.Transaction;
-public class HAMessageStoreSmokeTest extends QpidTestCase
+public interface Committer
{
- private final BDBHAMessageStore _store = new BDBHAMessageStore();
+ void start();
+
+ StoreFuture commit(Transaction tx, boolean syncCommit);
+
+ void stop();
- public void testMissingHAConfigThrowsException() throws Exception
+ Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
{
- try
+
+ @Override
+ public void start()
{
- _store.configure(mock(VirtualHost.class));
- fail("Expected an exception to be thrown");
}
- catch (ServerScopedRuntimeException ce)
+
+ @Override
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
{
- assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ return StoreFuture.IMMEDIATE_FUTURE;
}
- }
-}
+
+ @Override
+ public void stop()
+ {
+ }
+ };
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
new file mode 100644
index 0000000000..144ab83238
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public interface EnvironmentFacade
+{
+ @SuppressWarnings("serial")
+ final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+ // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84
+ put(EnvironmentConfig.STATS_COLLECT, "false");
+ }});
+
+ Environment getEnvironment();
+
+ Committer createCommitter(String name);
+
+ void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
+
+ Database getOpenDatabase(String name);
+
+ void commit(com.sleepycat.je.Transaction tx);
+
+ DatabaseException handleDatabaseException(String contextMessage, DatabaseException e);
+
+ String getStoreLocation();
+
+ void close();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index 59483751ca..b784e436b9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.model.VirtualHost;
-public interface HAMessageStore extends MessageStore
+public interface EnvironmentFacadeFactory
{
- /**
- * Used to indicate that a store requires to make itself unavailable for read and read/write
- * operations.
- */
- void passivate();
+
+ EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+
+ String getType();
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
new file mode 100644
index 0000000000..b13766a136
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+
+public class LoggingAsyncExceptionListener implements ExceptionListener
+{
+ private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class);
+
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
new file mode 100644
index 0000000000..8117ca1a9a
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public class StandardEnvironmentFacade implements EnvironmentFacade
+{
+ private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
+ public static final String TYPE = "BDB";
+
+ private final String _storePath;
+ private final Map<String, Database> _databases = new HashMap<String, Database>();
+
+ private Environment _environment;
+
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+ {
+ _storePath = storePath;
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment at environment path " + _storePath);
+ }
+
+ File environmentPath = new File(storePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+
+ for (Map.Entry<String, String> configItem : attributes.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+ _environment = new Environment(environmentPath, envConfig);
+ }
+
+ @Override
+ public void commit(com.sleepycat.je.Transaction tx)
+ {
+ try
+ {
+ tx.commitNoSync();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw handleDatabaseException("Got DatabaseException on commit", de);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ closeDatabases();
+ closeEnvironment();
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Database database : _databases.values())
+ {
+ try
+ {
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private void closeEnvironmentSafely()
+ {
+ if (_environment != null)
+ {
+ if (_environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Exception closing environment databases", e);
+ }
+ }
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ catch (IllegalStateException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ finally
+ {
+ _environment = null;
+ }
+ }
+ }
+
+ @Override
+ public Environment getEnvironment()
+ {
+ return _environment;
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environment != null)
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may
+ // not get a chance to finish.
+ try
+ {
+ _environment.cleanLog();
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+ }
+
+ @Override
+ public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e)
+ {
+ if (_environment != null && !_environment.isValid())
+ {
+ closeEnvironmentSafely();
+ }
+ return e;
+ }
+
+ @Override
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ {
+ for (String databaseName : databaseNames)
+ {
+ Database database = _environment.openDatabase(null, databaseName, dbConfig);
+ _databases .put(databaseName, database);
+ }
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ Database database = _databases.get(name);
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+ @Override
+ public Committer createCommitter(String name)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _storePath;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
new file mode 100644
index 0000000000..384ceba98a
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ {
+ Map<String, String> envConfigMap = new HashMap<String, String>();
+ envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+
+ Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ if (environmentConfigurationAttributes instanceof Map)
+ {
+ envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
+ }
+
+ String name = virtualHost.getName();
+ final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
+
+ String storeLocation;
+ if(isMessageStore)
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+ else // we are acting only as the durable config store
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ }
+
+ @Override
+ public String getType()
+ {
+ return StandardEnvironmentFacade.TYPE;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
new file mode 100644
index 0000000000..38fdf34196
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+public class DatabasePinger
+{
+ public static final String PING_DATABASE_NAME = "PINGDB";
+ private static final int ID = 0;
+
+ public void pingDb(EnvironmentFacade facade)
+ {
+ try
+ {
+ final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(ID, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ Transaction txn = null;
+ try
+ {
+ txn = facade.getEnvironment().beginTransaction(null, null);
+ db.put(txn, key, value);
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ try
+ {
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ }
+ finally
+ {
+ db.close();
+ }
+ }
+ }
+ catch (DatabaseException de)
+ {
+ facade.handleDatabaseException("DatabaseException from DatabasePinger ", de);
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
new file mode 100644
index 0000000000..76a48c189e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+public interface ReplicatedEnvironmentConfiguration
+{
+ String getName();
+ String getGroupName();
+ String getHostPort();
+ String getHelperHostPort();
+ String getDurability();
+ boolean isCoalescingSync();
+ boolean isDesignatedPrimary();
+ int getPriority();
+ int getQuorumOverride();
+ String getStorePath();
+ Map<String, String> getParameters();
+ Map<String, String> getReplicationParameters();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
new file mode 100644
index 0000000000..02181611c2
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -0,0 +1,1052 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
+import org.apache.qpid.server.store.berkeleydb.Committer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.DaemonThreadFactory;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.RepInternal;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
+import com.sleepycat.je.rep.vlsn.VLSNRange;
+import com.sleepycat.je.utilint.PropUtil;
+import com.sleepycat.je.utilint.VLSN;
+
+public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
+{
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+ public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+
+ private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+ private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+
+ private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+ private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
+
+ @SuppressWarnings("serial")
+ private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ /**
+ * Parameter decreased as the 24h default may lead very large log files for most users.
+ */
+ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+ /**
+ * Parameter increased as the 5 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+ /**
+ * Parameter increased as the 10 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+ /**
+ * Parameter decreased as the 10 h default may cause user confusion.
+ */
+ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+ /**
+ * Parameter changed from default (off) to allow the Environment to start in the
+ * UNKNOWN state when the majority is not available.
+ */
+ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
+ /**
+ * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+ * is scheduled to become default after JE 5.1.
+ */
+ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+ /**
+ * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+ * with NO_SYN durability in case if such Node crushes.
+ */
+ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+ }});
+
+ public static final String TYPE = "BDB-HA";
+
+ // TODO: JMX will change to observe the model, at that point these names will disappear
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ private final ReplicatedEnvironmentConfiguration _configuration;
+ private final Durability _durability;
+ private final Boolean _coalescingSync;
+ private final String _prettyGroupNodeName;
+ private final File _environmentDirectory;
+
+ private final ExecutorService _environmentJobExecutor;
+ private final ScheduledExecutorService _groupChangeExecutor;
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
+ private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
+ private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+
+ private volatile ReplicatedEnvironment _environment;
+ private volatile long _joinTime;
+ private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ {
+ _environmentDirectory = new File(configuration.getStorePath());
+ if (!_environmentDirectory.exists())
+ {
+ if (!_environmentDirectory.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ _configuration = configuration;
+
+ _durability = Durability.parse(_configuration.getDurability());
+ _coalescingSync = _configuration.isCoalescingSync();
+ _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
+
+ // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
+ _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
+ _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS); // TODO make configurable
+
+ // create environment in a separate thread to avoid renaming of the current thread by JE
+ _environment = createEnvironment(true);
+ }
+
+ @Override
+ public void commit(final Transaction tx)
+ {
+ try
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ tx.commit();
+ }
+ catch (DatabaseException de)
+ {
+ throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (_state.compareAndSet(State.OPENING, State.CLOSING) ||
+ _state.compareAndSet(State.OPEN, State.CLOSING) ||
+ _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+ {
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+ }
+
+ _environmentJobExecutor.shutdown();
+ _groupChangeExecutor.shutdown();
+ closeDatabases();
+ closeEnvironment();
+ }
+ finally
+ {
+ _state.compareAndSet(State.CLOSING, State.CLOSED);
+ }
+ }
+ }
+
+ @Override
+ public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe)
+ {
+ boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
+ if (restart)
+ {
+ tryToRestartEnvironment(dbe);
+ }
+ return dbe;
+ }
+
+ private void tryToRestartEnvironment(final DatabaseException dbe)
+ {
+ if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ {
+ if (dbe != null && LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
+ }
+
+ _environmentJobExecutor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ restartEnvironment();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception on environment restart", e);
+ }
+ }
+ });
+
+ }
+ else
+ {
+ LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+ }
+ }
+
+ @Override
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+
+ if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
+ {
+ throw new IllegalStateException("Databases can only be opened on Master node");
+ }
+
+ for (String databaseName : databaseNames)
+ {
+ _databases.put(databaseName, new DatabaseHolder(dbConfig));
+ }
+ for (String databaseName : databaseNames)
+ {
+ DatabaseHolder holder = _databases.get(databaseName);
+ openDatabaseInternally(databaseName, holder);
+ }
+ }
+
+ private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
+ {
+ Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+ holder.setDatabase(database);
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+ DatabaseHolder databaseHolder = _databases.get(name);
+ if (databaseHolder == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
+ }
+ Database database = databaseHolder.getDatabase();
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _environmentDirectory.getAbsolutePath();
+ }
+
+ @Override
+ public void stateChange(final StateChangeEvent stateChangeEvent)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState());
+ }
+
+ if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+ {
+ _groupChangeExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ stateChanged(stateChangeEvent);
+ }
+ });
+ }
+ else
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName
+ + "' is in state " + _state.get());
+ }
+ }
+ }
+
+ private void stateChanged(StateChangeEvent stateChangeEvent)
+ {
+ ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+ {
+ if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+ {
+ LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
+ _joinTime = System.currentTimeMillis();
+ }
+ }
+
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ reopenDatabases();
+ }
+
+ StateChangeListener listener = _stateChangeListener.get();
+ if (listener != null)
+ {
+ listener.stateChange(stateChangeEvent);
+ }
+
+ if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+ {
+ tryToRestartEnvironment(null);
+ }
+ _lastKnownEnvironmentState = state;
+ }
+
+ private void reopenDatabases()
+ {
+ DatabaseConfig pingDbConfig = new DatabaseConfig();
+ pingDbConfig.setTransactional(true);
+ pingDbConfig.setAllowCreate(true);
+
+ _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ openDatabaseInternally(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public String getGroupName()
+ {
+ return (String)_configuration.getGroupName();
+ }
+
+ public String getNodeName()
+ {
+ return _configuration.getName();
+ }
+
+ public String getHostPort()
+ {
+ return (String)_configuration.getHostPort();
+ }
+
+ public String getHelperHostPort()
+ {
+ return (String)_configuration.getHelperHostPort();
+ }
+
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
+ {
+ return _coalescingSync;
+ }
+
+ public String getNodeState()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ return ReplicatedEnvironment.State.UNKNOWN.name();
+ }
+ ReplicatedEnvironment.State state = _environment.getState();
+ return state.toString();
+ }
+
+ public boolean isDesignatedPrimary()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ return _environment.getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public Future<Void> setDesignatedPrimary(final boolean isPrimary)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setDesignatedPrimaryInternal(isPrimary);
+ return null;
+ }
+ });
+ }
+
+ void setDesignatedPrimaryInternal(final boolean isPrimary)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+ int getPriority()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getNodePriority();
+ }
+
+ public Future<Void> setPriority(final int priority)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setPriorityInternal(priority);
+ return null;
+ }
+ });
+ }
+
+ void setPriorityInternal(int priority)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+ int getElectableGroupSizeOverride()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getElectableGroupSizeOverride();
+ }
+
+ public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setElectableGroupSizeOverrideInternal(electableGroupOverride);
+ return null;
+ }
+ });
+ }
+
+ void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+
+ public long getJoinTime()
+ {
+ return _joinTime ;
+ }
+
+ public long getLastKnownReplicationTransactionId()
+ {
+ if (_state.get() == State.OPEN)
+ {
+ VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+ VLSN lastTxnEnd = range.getLastTxnEnd();
+ return lastTxnEnd.getSequence();
+ }
+ else
+ {
+ return -1L;
+ }
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String, String>>();
+
+ for (ReplicationNode node : _environment.getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(final String nodeName)
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+
+ public void updateAddress(final String nodeName, final String newHostName, final int newPort)
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(_environment.getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = _environment.getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
+ }
+
+
+ public ReplicatedEnvironment getEnvironment()
+ {
+ return _environment;
+ }
+
+ public State getFacadeState()
+ {
+ return _state.get();
+ }
+
+ public void setStateChangeListener(StateChangeListener stateChangeListener)
+ {
+ if (_stateChangeListener.compareAndSet(null, stateChangeListener))
+ {
+ _environment.setStateChangeListener(this);
+ }
+ else
+ {
+ throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName);
+ }
+ }
+
+ private void closeEnvironment()
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may not
+ // get a chance to finish.
+ try
+ {
+ if (_environment.isValid())
+ {
+ _environment.cleanLog();
+ }
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+
+ private void restartEnvironment()
+ {
+ LOGGER.info("Restarting environment");
+
+ closeEnvironmentSafely();
+
+ _environment = createEnvironment(false);
+
+ if (_stateChangeListener.get() != null)
+ {
+ _environment.setStateChangeListener(this);
+ }
+
+ LOGGER.info("Environment is restarted");
+ }
+
+ private void closeEnvironmentSafely()
+ {
+ Environment environment = _environment;
+ if (environment != null)
+ {
+ try
+ {
+ if (environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing databases", e);
+ }
+ }
+ environment.close();
+ }
+ catch (EnvironmentFailureException efe)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing environment", efe);
+ }
+ }
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ DatabaseHolder databaseHolder = entry.getValue();
+ Database database = databaseHolder.getDatabase();
+ if (database != null)
+ {
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+ }
+
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ finally
+ {
+ databaseHolder.setDatabase(null);
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
+ {
+ String groupName = _configuration.getGroupName();
+ String helperHostPort = _configuration.getHelperHostPort();
+ String hostPort = _configuration.getHostPort();
+ Map<String, String> environmentParameters = _configuration.getParameters();
+ Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters();
+ boolean designatedPrimary = _configuration.isDesignatedPrimary();
+ int priority = _configuration.getPriority();
+ int quorumOverride = _configuration.getQuorumOverride();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment");
+ LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
+ LOGGER.info("Group name " + groupName);
+ LOGGER.info("Node name " + _configuration.getName());
+ LOGGER.info("Node host port " + hostPort);
+ LOGGER.info("Helper host port " + helperHostPort);
+ LOGGER.info("Durability " + _durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+ LOGGER.info("Node priority " + priority);
+ LOGGER.info("Quorum override " + quorumOverride);
+ }
+
+ Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+ if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+ {
+ replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+ }
+ Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+ if (environmentParameters != null && !environmentParameters.isEmpty())
+ {
+ environmentSettings.putAll(environmentParameters);
+ }
+
+ ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort);
+ replicationConfig.setHelperHosts(helperHostPort);
+ replicationConfig.setDesignatedPrimary(designatedPrimary);
+ replicationConfig.setNodePriority(priority);
+ replicationConfig.setElectableGroupSizeOverride(quorumOverride);
+
+ for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+ envConfig.setDurability(_durability);
+
+ for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ if (createEnvironmentInSeparateThread)
+ {
+ return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+ }
+ else
+ {
+ return createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
+ Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){
+ @Override
+ public ReplicatedEnvironment call() throws Exception
+ {
+ String originalThreadName = Thread.currentThread().getName();
+ try
+ {
+ return createEnvironment(environmentPathFile, envConfig, replicationConfig);
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }});
+
+ long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ try
+ {
+ return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Environment creation was interrupted", e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("JE environment has not been created in due time");
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
+ ReplicatedEnvironment environment = null;
+ try
+ {
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ }
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
+ }
+ return environment;
+ }
+
+ @Override
+ public Committer createCommitter(String name)
+ {
+ if (_coalescingSync)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+ else
+ {
+ return Committer.IMMEDIATE_FUTURE_COMMITTER;
+ }
+ }
+
+ public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ {
+ if (repNode == null)
+ {
+ throw new IllegalArgumentException("Node cannot be null");
+ }
+ return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
+ }
+
+ // For testing only
+ int getNumberOfElectableGroupMembers()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ return _environment.getGroup().getElectableNodes().size();
+ }
+
+ private class RemoteNodeStateLearner implements Callable<Void>
+ {
+ private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+ @Override
+ public Void call()
+ {
+ final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+ try
+ {
+ Set<Future<Void>> futures = new HashSet<Future<Void>>();
+
+ for (final ReplicationNode node : _environment.getGroup().getElectableNodes())
+ {
+ Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL);
+ ReplicatedEnvironment.State nodeState;
+ try
+ {
+ nodeState = ping.getNodeState().getNodeState();
+ }
+ catch (IOException e)
+ {
+ nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ }
+ catch (ServiceConnectFailedException e)
+ {
+ nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ }
+
+ currentGroupState.put(node.getName(), nodeState);
+ return null;
+ }
+ });
+ futures.add(future);
+ }
+
+ for (Future<Void> future : futures)
+ {
+ try
+ {
+ future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
+ future.cancel(true);
+ }
+ }
+
+ if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+ {
+ boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+ _previousGroupState = currentGroupState;
+ if (stateChanged && State.OPEN == _state.get())
+ {
+ new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ finally
+ {
+ _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+ }
+ public static enum State
+ {
+ OPENING,
+ OPEN,
+ RESTARTING,
+ CLOSING,
+ CLOSED
+ }
+
+ private static class DatabaseHolder
+ {
+ private final DatabaseConfig _config;
+ private Database _database;
+
+ public DatabaseHolder(DatabaseConfig config)
+ {
+ _config = config;
+ }
+
+ public Database getDatabase()
+ {
+ return _database;
+ }
+
+ public void setDatabase(Database database)
+ {
+ _database = database;
+ }
+
+ public DatabaseConfig getConfig()
+ {
+ return _config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
+ }
+
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
new file mode 100644
index 0000000000..cd53afe891
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+ private static final int DEFAULT_NODE_PRIORITY = 1;
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+ ReplicaAckPolicy.SIMPLE_MAJORITY);
+ private static final boolean DEFAULT_COALESCING_SYNC = true;
+
+
+
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ {
+ ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
+ {
+ @Override
+ public boolean isDesignatedPrimary()
+ {
+ return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ }
+
+ @Override
+ public boolean isCoalescingSync()
+ {
+ return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ }
+
+ @Override
+ public String getStorePath()
+ {
+ return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ }
+
+ @Override
+ public Map<String, String> getParameters()
+ {
+ return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ }
+
+ @Override
+ public Map<String, String> getReplicationParameters()
+ {
+ return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ }
+
+ @Override
+ public int getQuorumOverride()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return DEFAULT_NODE_PRIORITY;
+ }
+
+
+
+ @Override
+ public String getName()
+ {
+ return (String)virtualHost.getAttribute("haNodeName");
+ }
+
+ @Override
+ public String getHostPort()
+ {
+ return (String)virtualHost.getAttribute("haNodeAddress");
+ }
+
+ @Override
+ public String getHelperHostPort()
+ {
+ return (String)virtualHost.getAttribute("haHelperAddress");
+ }
+
+ @Override
+ public String getGroupName()
+ {
+ return (String)virtualHost.getAttribute("haGroupName");
+ }
+
+ @Override
+ public String getDurability()
+ {
+ return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ }
+ };
+ return new ReplicatedEnvironmentFacade(configuration);
+
+ }
+
+ @Override
+ public String getType()
+ {
+ return ReplicatedEnvironmentFacade.TYPE;
+ }
+
+ private boolean convertBoolean(final Object value, boolean defaultValue)
+ {
+ if(value instanceof Boolean)
+ {
+ return (Boolean) value;
+ }
+ else if(value instanceof String)
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else if(value == null)
+ {
+ return defaultValue;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Boolean");
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index 4d536a2f95..7852e2d703 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -21,11 +21,13 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import com.sleepycat.je.Cursor;
+
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -38,6 +40,8 @@ import com.sleepycat.je.OperationStatus;
public class Upgrader
{
+ private static final Logger LOGGER = Logger.getLogger(Upgrader.class);
+
static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
@@ -63,7 +67,8 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
+
+ int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -73,11 +78,17 @@ public class Upgrader
}
int version = getSourceVersion(versionDb);
- if(version > AbstractBDBMessageStore.VERSION)
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Source message store version is " + version);
+ }
+
+ if(version > BDBMessageStore.VERSION)
{
throw new StoreException("Database version " + version
+ " is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION);
+ + BDBMessageStore.VERSION);
}
performUpgradeFromVersion(version, versionDb);
}
@@ -124,8 +135,9 @@ public class Upgrader
}
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+ throws StoreException
{
- while(sourceVersion != AbstractBDBMessageStore.VERSION)
+ while(sourceVersion != BDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
@@ -136,7 +148,7 @@ public class Upgrader
}
}
- void upgrade(final int fromVersion, final int toVersion)
+ void upgrade(final int fromVersion, final int toVersion) throws StoreException
{
try
{
@@ -177,7 +189,7 @@ public class Upgrader
private int identifyOldStoreVersion() throws DatabaseException
{
- int version = 0;
+ int version = BDBMessageStore.VERSION;
for (String databaseName : _environment.getDatabaseNames())
{
if (databaseName.contains("_v"))
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
deleted file mode 100644
index c2b3aeab3e..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.net.InetAddress;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class BDBHAMessageStoreTest extends QpidTestCase
-{
- private static final String TEST_LOG_FILE_MAX = "1000000";
- private static final String TEST_ELECTION_RETRIES = "1000";
- private static final String TEST_NUMBER_OF_THREADS = "10";
- private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
- private String _groupName;
- private String _workDir;
- private int _masterPort;
- private String _host;
- private XMLConfiguration _configXml;
- private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _modelVhost;
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- _workDir = TMP_FOLDER + File.separator + getName();
- _host = InetAddress.getByName("localhost").getHostAddress();
- _groupName = "group" + getName();
- _masterPort = -1;
-
- FileUtils.delete(new File(_workDir), true);
- _configXml = new XMLConfiguration();
- _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class);
-
-
- BrokerTestHelper.setUp();
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- FileUtils.delete(new File(_workDir), true);
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testSetSystemConfiguration() throws Exception
- {
- // create virtual host configuration, registry and host instance
- addVirtualHostConfiguration();
- String vhostName = "test" + _masterPort;
- VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
-
- _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost);
- BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
-
- // test whether JVM system settings were applied
- Environment env = store.getEnvironment();
- assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
-
- ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
- assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
- assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
- }
-
- private void addVirtualHostConfiguration() throws Exception
- {
- int port = findFreePort();
- if (_masterPort == -1)
- {
- _masterPort = port;
- }
- String nodeName = getNodeNameForNodeAt(port);
-
- String vhostName = "test" + port;
- String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
-
- _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
- _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE);
-
- when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator
- + port);
- when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName);
- when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName);
- when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port));
- when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort());
-
- Map<String,String> bdbEnvConfig = new HashMap<String,String>();
- bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS);
- bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX);
-
- when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig);
-
- Map<String,String> repConfig = new HashMap<String,String>();
- repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES);
- repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT);
- when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig);
-
- }
-
- private String getNodeNameForNodeAt(final int bdbPort)
- {
- return "node" + getName() + bdbPort;
- }
-
- private String getNodeHostPortForNodeAt(final int bdbPort)
- {
- return _host + ":" + bdbPort;
- }
-
- private String getHelperHostPort()
- {
- if (_masterPort == -1)
- {
- throw new IllegalStateException("Helper port not yet assigned.");
- }
- return _host + ":" + _masterPort;
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
index 730001d849..385681446a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
@@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
public class MessageStoreCreatorTest extends QpidTestCase
{
- private static final String[] STORE_TYPES = {BDBMessageStore.TYPE};
-
public void testMessageStoreCreator()
{
MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
- for (String type : STORE_TYPES)
- {
- MessageStore store = messageStoreCreator.createMessageStore(type);
- assertNotNull("Store of type " + type + " is not created", store);
- }
- }
-}
+ String type = new BDBMessageStoreFactory().getType();
+ MessageStore store = messageStoreCreator.createMessageStore(type);
+ assertNotNull("Store of type " + type + " is not created", store);
+ }}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..b19e18b204
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
+{
+ protected File _storePath;
+ protected EnvironmentFacade _environmentFacade;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+ finally
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ }
+
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+ }
+ }
+
+ EnvironmentFacade getEnvironmentFacade() throws Exception
+ {
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = createEnvironmentFacade();
+ }
+ return _environmentFacade;
+ }
+
+ EnvironmentFacade createEnvironmentFacade()
+ {
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
new file mode 100644
index 0000000000..a05a30b459
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+ private Broker _broker;
+ private StatisticsGatherer _statisticsGatherer;
+ private RecovererProvider _recovererProvider;
+ private File _configFile;
+ private File _bdbStorePath;
+ private VirtualHost _host;
+ private ConfigurationEntryStore _store;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _store = mock(ConfigurationEntryStore.class);
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taslExecutor = mock(TaskExecutor.class);
+ when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+
+ _statisticsGatherer = mock(StatisticsGatherer.class);
+
+ _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+ _bdbStorePath.deleteOnExit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_host != null)
+ {
+ _host.setDesiredState(_host.getState(), State.STOPPED);
+ }
+ }
+ finally
+ {
+ if (_configFile != null)
+ {
+ _configFile.delete();
+ }
+ if (_bdbStorePath != null)
+ {
+ FileUtils.delete(_bdbStorePath, true);
+ }
+ super.tearDown();
+ }
+ }
+
+
+ public void testCreateBdbVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+ long logFileMax = 2000000;
+ _host = createHostFromConfiguration(hostName, logFileMax);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+ assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ }
+
+ public void testCreateBdbHaVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+
+ String repStreamTimeout = "2 h";
+ String nodeName = "node";
+ String groupName = "group";
+ String nodeHostPort = "localhost:" + findFreePort();
+ String helperHostPort = nodeHostPort;
+ String durability = "NO_SYNC,SYNC,NONE";
+ _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+ ReplicationConfig repConfig = environment.getRepConfig();
+ assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME));
+ assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME));
+ assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT));
+ assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS));
+ assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY));
+ assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children)
+ {
+ ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+ children, _store);
+
+ return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes)
+ {
+ return createHost(attributes, Collections.<UUID> emptySet());
+ }
+
+ private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+
+ private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<highAvailability>"
+ + "<groupName>" + groupName + "</groupName>"
+ + "<nodeName>" + nodeName + "</nodeName>"
+ + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+ + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+ + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+ + "</highAvailability>"
+ + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+ private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+ {
+ _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+ return attributes;
+ }
+}
+
+ \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..cd7dd69c46
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
+{
+
+ private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
+ private static final int LISTENER_TIMEOUT = 5;
+ private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
+ private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
+ private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+ private static final boolean TEST_DESIGNATED_PRIMARY = false;
+ private static final boolean TEST_COALESCING_SYNC = true;
+ private static final int TEST_PRIORITY = 1;
+ private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
+
+ private File _storePath;
+ private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
+ private VirtualHost _virtualHost = mock(VirtualHost.class);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
+
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+
+ setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ for (EnvironmentFacade ef : _nodes.values())
+ {
+ ef.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+ }
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
+ }
+ }
+
+ public void testGetGroupName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
+ }
+
+ public void testGetNodeName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
+ }
+
+ public void testLastKnownReplicationTransactionId() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
+ assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
+ }
+
+ public void testGetNodeHostPort() throws Exception
+ {
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
+ }
+
+ public void testGetHelperHostPort() throws Exception
+ {
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
+ }
+
+ public void testGetDurability() throws Exception
+ {
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
+ }
+
+ public void testIsCoalescingSync() throws Exception
+ {
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
+ }
+
+ public void testGetNodeState() throws Exception
+ {
+ assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
+ }
+
+
+ public void testPriority() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
+ Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
+ }
+
+ public void testDesignatedPrimary() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ }
+
+ public void testElectableGroupSizeOverride() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
+ Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
+ }
+
+ public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
+ {
+ final CountDownLatch masterLatch = new CountDownLatch(1);
+ final AtomicInteger masterStateChangeCount = new AtomicInteger();
+ final CountDownLatch unknownLatch = new CountDownLatch(1);
+ final AtomicInteger unknownStateChangeCount = new AtomicInteger();
+ StateChangeListener stateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER)
+ {
+ masterStateChangeCount.incrementAndGet();
+ masterLatch.countDown();
+ }
+ else if (stateChangeEvent.getState() == State.UNKNOWN)
+ {
+ unknownStateChangeCount.incrementAndGet();
+ unknownLatch.countDown();
+ }
+ }
+ };
+
+ addNode(State.MASTER, stateChangeListener);
+ assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+
+ ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+
+ // close replicas
+ replica1.close();
+ replica2.close();
+
+ assertTrue("Environment should be recreated and go into unknown state",
+ unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
+ assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
+ }
+
+ public void testCloseStateTransitions() throws Exception
+ {
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+ }
+
+ private ReplicatedEnvironmentFacade createMaster() throws Exception
+ {
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+ assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ return env;
+ }
+
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+ {
+ TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
+ assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
+ return replicaEnvironmentFacade;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
+ State desiredState, StateChangeListener stateChangeListener)
+ {
+ ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ref.setStateChangeListener(stateChangeListener);
+ _nodes.put(nodeName, ref);
+ return ref;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+ {
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+ }
+
+ private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ {
+ ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
+ when(node.getName()).thenReturn(nodeName);
+ when(node.getHostPort()).thenReturn(nodeHostPort);
+ when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
+ when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getPriority()).thenReturn(TEST_PRIORITY);
+ when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getDurability()).thenReturn(TEST_DURABILITY);
+ when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
+
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ when(node.getReplicationParameters()).thenReturn(repConfig);
+ when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
+ return node;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
new file mode 100644
index 0000000000..0870191b35
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+class TestStateChangeListener implements StateChangeListener
+{
+ private final Set<State> _expectedStates;
+ private final CountDownLatch _latch;
+ private final AtomicReference<State> _currentActualState = new AtomicReference<State>();
+
+ public TestStateChangeListener(State expectedState)
+ {
+ this(Collections.singleton(expectedState));
+ }
+
+ public TestStateChangeListener(Set<State> expectedStates)
+ {
+ _expectedStates = new HashSet<State>(expectedStates);
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ _currentActualState.set(stateChangeEvent.getState());
+ if (_expectedStates.contains(stateChangeEvent.getState()))
+ {
+ _latch.countDown();
+ }
+ }
+
+ public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException
+ {
+ return _latch.await(timeout, timeUnit);
+ }
+
+ public State getCurrentActualState()
+ {
+ return _currentActualState.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
index 400ac12792..810f4a1fca 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -26,7 +26,7 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
@@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
catch(ServerScopedRuntimeException ex)
{
assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION, ex.getMessage());
+ + BDBMessageStore.VERSION, ex.getMessage());
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index bd0411619e..04817ad36c 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -41,6 +45,8 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreTest;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -73,7 +79,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -126,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
/*
* reload the store only (read-only)
*/
- AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore);
+ BDBMessageStore readOnlyStore = reloadStore(bdbStore);
/*
* Read back and validate the 0-8 message metadata and content
@@ -225,14 +231,17 @@ public class BDBMessageStoreTest extends MessageStoreTest
* Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
- private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
+ private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
{
messageStore.close();
- AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.configure(getVirtualHostModel(),true);
+ BDBMessageStore newStore = new BDBMessageStore();
+
+ MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
+ when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
+ newStore.configureMessageStore(getVirtualHostModel(), recoveryHandler, null);
- newStore.startWithNoRecover();
+ newStore.activate();
return newStore;
}
@@ -287,7 +296,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testGetContentWithOffset() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -347,7 +356,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testMessageCreationAndRemoval() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -372,12 +381,12 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private AbstractBDBMessageStore assertBDBStore(MessageStore store)
+ private BDBMessageStore assertBDBStore(MessageStore store)
{
assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
- return (AbstractBDBMessageStore) store;
+ return (BDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
@@ -410,7 +419,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -460,7 +469,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -506,7 +515,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testOnDelete() throws Exception
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
String storeLocation = bdbStore.getStoreLocation();
File location = new File(storeLocation);
@@ -529,7 +538,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 4b50121a7a..e8d18971ad 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -143,7 +144,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
CompositeData row = groupMembers.get(new Object[] {nodeName});
assertNotNull("Table does not contain row for node name " + nodeName, row);
- assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java
new file mode 100644
index 0000000000..4f1f830fd0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ThreadFactory;
+
+public final class DaemonThreadFactory implements ThreadFactory
+{
+ private String _threadName;
+ public DaemonThreadFactory(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread thread = new Thread(r, _threadName);
+ thread.setDaemon(true);
+ return thread;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 7a4f92f0ca..9fc95c1861 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -71,7 +72,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_store = createStore();
((DurableConfigurationStore)_store).configureConfigStore(vhost, null);
- _store.configureMessageStore(vhost, mock(MessageStoreRecoveryHandler.class), null);
+ MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
+ when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
+ _store.configureMessageStore(vhost, recoveryHandler, null);
+ _store.activate();
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
@@ -89,7 +93,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
{
if (_store != null)
{
- _store.close();
+ // _store.close();
}
FileUtils.delete(_storeLocation, true);
}