diff options
Diffstat (limited to 'qpid/java')
31 files changed, 4651 insertions, 2866 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index aa4ddd8181..f36c1ecc6f 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -36,19 +36,12 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; -import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * Management mbean for BDB HA. - * <p> - * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is - * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the - * same classloader as broker-plugins-management-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF - * of this bundle. - * </p> */ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore { @@ -63,7 +56,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M try { GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING}; - final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT}; final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", itemNames, @@ -71,7 +64,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M GROUP_MEMBER_ATTRIBUTE_TYPES ); GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", GROUP_MEMBER_ROW, - new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); + new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME}); } catch (final OpenDataException ode) { @@ -79,44 +72,46 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } - private final BDBHAMessageStore _store; + private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; + private final String _objectName; - protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException + protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException { super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry()); - LOGGER.debug("Creating BDBHAMessageStoreManagerMBean"); - _store = store; + LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName); + _replicatedEnvironmentFacade = replicatedEnvironmentFacade; + _objectName = ObjectName.quote(virtualHostName); register(); } @Override public String getObjectInstanceName() { - return ObjectName.quote(_store.getName()); + return _objectName; } @Override public String getGroupName() { - return _store.getGroupName(); + return _replicatedEnvironmentFacade.getGroupName(); } @Override public String getNodeName() { - return _store.getNodeName(); + return _replicatedEnvironmentFacade.getNodeName(); } @Override public String getNodeHostPort() { - return _store.getNodeHostPort(); + return _replicatedEnvironmentFacade.getHostPort(); } @Override public String getHelperHostPort() { - return _store.getHelperHostPort(); + return _replicatedEnvironmentFacade.getHelperHostPort(); } @Override @@ -124,7 +119,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - return _store.getDurability(); + return _replicatedEnvironmentFacade.getDurability(); } catch (RuntimeException e) { @@ -137,7 +132,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public boolean getCoalescingSync() throws IOException, JMException { - return _store.isCoalescingSync(); + return _replicatedEnvironmentFacade.isCoalescingSync(); } @Override @@ -145,7 +140,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - return _store.getNodeState(); + return _replicatedEnvironmentFacade.getNodeState(); } catch (RuntimeException e) { @@ -159,7 +154,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - return _store.isDesignatedPrimary(); + return _replicatedEnvironmentFacade.isDesignatedPrimary(); } catch (RuntimeException e) { @@ -172,7 +167,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M public TabularData getAllNodesInGroup() throws IOException, JMException { final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); - final List<Map<String, String>> members = _store.getGroupMembers(); + final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers(); for (Map<String, String> map : members) { @@ -187,9 +182,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - _store.removeNodeFromGroup(nodeName); + _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName); } - catch (StoreException e) + catch (RuntimeException e) { LOGGER.error("Failed to remove node " + nodeName + " from group", e); throw new JMException(e.getMessage()); @@ -201,11 +196,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - _store.setDesignatedPrimary(primary); + _replicatedEnvironmentFacade.setDesignatedPrimary(primary); } - catch (StoreException e) + catch (RuntimeException e) { - LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); + LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e); throw new JMException(e.getMessage()); } } @@ -215,9 +210,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { try { - _store.updateAddress(nodeName, newHostName, newPort); + _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort); } - catch(StoreException e) + catch(RuntimeException e) { LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); throw new JMException(e.getMessage()); diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 0492350a25..16199d30a3 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanProvider; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual - * host and of type {@link BDBHAMessageStore#TYPE}. + * host and of type {@link ReplicatedEnvironmentFacade#TYPE}. * */ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider @@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider public boolean isChildManageableByMBean(ConfiguredObject child) { return (child instanceof VirtualHost - && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); } @Override @@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider { VirtualHost virtualHostChild = (VirtualHost) child; - BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore(); + BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Creating mBean for child " + child); } - return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade(); + return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent); } @Override diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 0d963ebdae..fa16d1061a 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -37,10 +37,9 @@ import javax.management.openmbean.TabularData; import junit.framework.TestCase; -import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; -import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; public class BDBHAMessageStoreManagerMBeanTest extends TestCase { @@ -53,7 +52,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase private static final String TEST_STORE_NAME = "testStoreName"; private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; - private BDBHAMessageStore _store; + private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; private BDBHAMessageStoreManagerMBean _mBean; private AMQManagedObject _mBeanParent; @@ -62,10 +61,10 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase { super.setUp(); - _store = mock(BDBHAMessageStore.class); + _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class); _mBeanParent = mock(AMQManagedObject.class); when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class)); - _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent); + _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade, _mBeanParent); } @Override @@ -76,64 +75,62 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase public void testObjectName() throws Exception { - when(_store.getName()).thenReturn(TEST_STORE_NAME); - String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME); assertEquals(expectedObjectName, _mBean.getObjectName().toString()); } public void testGroupName() throws Exception { - when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME); assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); } public void testNodeName() throws Exception { - when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); + when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME); assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); } public void testNodeHostPort() throws Exception { - when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); + when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT); assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); } public void testHelperHostPort() throws Exception { - when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); } public void testDurability() throws Exception { - when(_store.getDurability()).thenReturn(TEST_DURABILITY); + when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY); assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); } public void testCoalescingSync() throws Exception { - when(_store.isCoalescingSync()).thenReturn(true); + when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true); assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); } public void testNodeState() throws Exception { - when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); + when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE); assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); } public void testDesignatedPrimaryFlag() throws Exception { - when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); } @@ -141,29 +138,29 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase public void testGroupMembersForGroupWithOneNode() throws Exception { List<Map<String, String>> members = Collections.singletonList(createTestNodeResult()); - when(_store.getGroupMembers()).thenReturn(members); + when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members); final TabularData resultsTable = _mBean.getAllNodesInGroup(); - assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); + assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT); final int numberOfDataRows = resultsTable.size(); assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); - assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); - assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT)); } public void testRemoveNodeFromReplicationGroup() throws Exception { _mBean.removeNodeFromGroup(TEST_NODE_NAME); - verify(_store).removeNodeFromGroup(TEST_NODE_NAME); + verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME); } public void testRemoveNodeFromReplicationGroupWithError() throws Exception { - doThrow(new StoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); + doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME); try { @@ -180,12 +177,12 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase { _mBean.setDesignatedPrimary(true); - verify(_store).setDesignatedPrimary(true); + verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true); } public void testSetAsDesignatedPrimaryWithError() throws Exception { - doThrow(new StoreException("mocked exception")).when(_store).setDesignatedPrimary(true); + doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true); try { @@ -205,7 +202,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); - verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); + verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort); } private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) @@ -220,8 +217,8 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase private Map<String, String> createTestNodeResult() { Map<String, String> items = new HashMap<String, String>(); - items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); - items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); return items; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java deleted file mode 100644 index 37fb77f547..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ /dev/null @@ -1,1867 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.*; -import com.sleepycat.je.Transaction; - -import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.*; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; -import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; -import org.apache.qpid.util.FileUtils; - -public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore -{ - private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class); - - private static final int LOCK_RETRY_ATTEMPTS = 5; - - public static final int VERSION = 7; - - private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() - {{ - put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); - put(EnvironmentConfig.STATS_COLLECT, "false"); // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84 - }}); - - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private Environment _environment; - - private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; - private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; - private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; - private static String BRIDGEDB_NAME = "BRIDGES"; - private static String LINKDB_NAME = "LINKS"; - private static String XIDDB_NAME = "XIDS"; - private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; - - private Database _configuredObjectsDb; - private Database _configVersionDb; - private Database _messageMetaDataDb; - private Database _messageContentDb; - private Database _deliveryDb; - private Database _bridgeDb; - private Database _linkDb; - private Database _xidDb; - - /* ======= - * Schema: - * ======= - * - * Queue: - * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), - * arguments(FieldTable encoded as binary), exclusive (boolean) - * - * Exchange: - * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean) - * - * Binding: - * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString), - * arguments (FieldTable encoded as binary) - 0 (zero) - * - * QueueEntry: - * queueName(AMQShortString), messageId (long) - 0 (zero) - * - * Message (MetaData): - * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary) - * - * Message (Content): - * messageId (long), byteOffset (integer) - dataLength(integer), data(binary) - */ - - private final AtomicLong _messageId = new AtomicLong(0); - - protected final StateManager _stateManager; - - private MessageStoreRecoveryHandler _messageRecoveryHandler; - - private TransactionLogRecoveryHandler _tlogRecoveryHandler; - - private ConfigurationRecoveryHandler _configRecoveryHandler; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - private final EventManager _eventManager = new EventManager(); - private String _storeLocation; - - private Map<String, String> _envConfigMap; - private VirtualHost _virtualHost; - - public AbstractBDBMessageStore() - { - _stateManager = new StateManager(_eventManager); - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) - { - _stateManager.attainState(State.INITIALISING); - - _configRecoveryHandler = recoveryHandler; - _virtualHost = virtualHost; - } - - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) - { - if(_stateManager.isInState(State.INITIAL)) - { - // Is acting as a message store, but not a durable config store - _stateManager.attainState(State.INITIALISING); - } - - _messageRecoveryHandler = messageRecoveryHandler; - _tlogRecoveryHandler = tlogRecoveryHandler; - _virtualHost = virtualHost; - - completeInitialisation(); - } - - private void completeInitialisation() - { - configure(_virtualHost); - - _stateManager.attainState(State.INITIALISED); - } - - public synchronized void activate() - { - // check if acting as a durable config store, but not a message store - if(_stateManager.isInState(State.INITIALISING)) - { - completeInitialisation(); - } - _stateManager.attainState(State.ACTIVATING); - - if(_configRecoveryHandler != null) - { - recoverConfig(_configRecoveryHandler); - } - if(_messageRecoveryHandler != null) - { - recoverMessages(_messageRecoveryHandler); - } - if(_tlogRecoveryHandler != null) - { - recoverQueueEntries(_tlogRecoveryHandler); - } - - _stateManager.attainState(State.ACTIVE); - } - - public org.apache.qpid.server.store.Transaction newTransaction() - { - return new BDBTransaction(); - } - - /** - * Called after instantiation in order to configure the message store. - * - * - * - * @param virtualHost The virtual host using this store - * @return whether a new store environment was created or not (to indicate whether recovery is necessary) - * - * @throws Exception If any error occurs that means the store is unable to configure itself. - */ - public void configure(VirtualHost virtualHost) - { - configure(virtualHost, _messageRecoveryHandler != null); - } - - public void configure(VirtualHost virtualHost, boolean isMessageStore) - { - String name = virtualHost.getName(); - final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation; - if(isMessageStore) - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - else // we are acting only as the durable config store - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - File environmentPath = new File(storeLocation); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - _storeLocation = storeLocation; - - _envConfigMap = new HashMap<String, String>(); - _envConfigMap.putAll(ENVCONFIG_DEFAULTS); - - Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig"); - if(bdbEnvConfigAttr instanceof Map) - { - _envConfigMap.putAll((Map)bdbEnvConfigAttr); - } - - LOGGER.info("Configuring BDB message store"); - - setupStore(environmentPath, name); - } - - protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException - { - final List<Object> argumentNames = config.getList(prefix + ".name"); - final List<Object> argumentValues = config.getList(prefix + ".value"); - final int initialSize = argumentNames.size() + defaultConfig.size(); - - final Map<String,String> attributes = new HashMap<String,String>(initialSize); - attributes.putAll(defaultConfig); - - for (int i = 0; i < argumentNames.size(); i++) - { - final String argName = argumentNames.get(i).toString(); - final String argValue = argumentValues.get(i).toString(); - - attributes.put(argName, argValue); - } - - return Collections.unmodifiableMap(attributes); - } - - @Override - public String getStoreLocation() - { - return _storeLocation; - } - - /** - * Move the store state from INITIAL to ACTIVE without actually recovering. - * - * This is required if you do not want to perform recovery of the store data - * - * @throws org.apache.qpid.server.store.StoreException if the store is not in the correct state - */ - void startWithNoRecover() throws StoreException - { - _stateManager.attainState(State.INITIALISING); - _stateManager.attainState(State.INITIALISED); - _stateManager.attainState(State.ACTIVATING); - _stateManager.attainState(State.ACTIVE); - } - - protected void setupStore(File storePath, String name) - { - _environment = createEnvironment(storePath); - - new Upgrader(_environment, name).upgradeIfNecessary(); - - openDatabases(); - - _totalStoreSize = getSizeOnDisk(); - } - - protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException; - - public Environment getEnvironment() - { - return _environment; - } - - private void openDatabases() throws DatabaseException - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - - //This is required if we are wanting read only access. - dbConfig.setReadOnly(false); - - _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); - _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); - _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); - _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); - _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); - _linkDb = openDatabase(LINKDB_NAME, dbConfig); - _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig); - _xidDb = openDatabase(XIDDB_NAME, dbConfig); - } - - private Database openDatabase(final String dbName, final DatabaseConfig dbConfig) - { - // if opening read-only and the database doesn't exist, then you can't create it - return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName) - ? null - : _environment.openDatabase(null, dbName, dbConfig); - } - - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ - public void close() - { - if (_closed.compareAndSet(false, true)) - { - _stateManager.attainState(State.CLOSING); - closeInternal(); - _stateManager.attainState(State.CLOSED); - } - } - - protected void closeInternal() - { - if (_messageMetaDataDb != null) - { - LOGGER.info("Closing message metadata database"); - _messageMetaDataDb.close(); - } - - if (_messageContentDb != null) - { - LOGGER.info("Closing message content database"); - _messageContentDb.close(); - } - - if (_configuredObjectsDb != null) - { - LOGGER.info("Closing configurable objects database"); - _configuredObjectsDb.close(); - } - - if (_deliveryDb != null) - { - LOGGER.info("Close delivery database"); - _deliveryDb.close(); - } - - if (_bridgeDb != null) - { - LOGGER.info("Close bridge database"); - _bridgeDb.close(); - } - - if (_linkDb != null) - { - LOGGER.info("Close link database"); - _linkDb.close(); - } - - - if (_xidDb != null) - { - LOGGER.info("Close xid database"); - _xidDb.close(); - } - - - if (_configVersionDb != null) - { - LOGGER.info("Close config version database"); - _configVersionDb.close(); - } - - closeEnvironment(); - - } - - private void closeEnvironment() throws DatabaseException - { - if (_environment != null) - { - // Clean the log before closing. This makes sure it doesn't contain - // redundant data. Closing without doing this means the cleaner may not - // get a chance to finish. - try - { - _environment.cleanLog(); - } - finally - { - _environment.close(); - } - } - } - - - private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) - { - try - { - final int configVersion = getConfigVersion(); - recoveryHandler.beginConfigurationRecovery(this, configVersion); - loadConfiguredObjects(recoveryHandler); - - final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - - private void updateConfigVersion(int newConfigVersion) throws StoreException - { - Cursor cursor = null; - try - { - Transaction txn = _environment.beginTransaction(null, null); - cursor = _configVersionDb.openCursor(txn, null); - DatabaseEntry key = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0,key); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - IntegerBinding.intToEntry(newConfigVersion, value); - OperationStatus status = cursor.put(key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error setting config version: " + status); - } - } - cursor.close(); - cursor = null; - txn.commit(); - } - finally - { - closeCursorSafely(cursor); - } - - } - - private int getConfigVersion() throws StoreException - { - Cursor cursor = null; - try - { - cursor = _configVersionDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - return IntegerBinding.entryToInt(value); - } - - // Insert 0 as the default config version - IntegerBinding.intToEntry(0,value); - ByteBinding.byteToEntry((byte) 0,key); - OperationStatus status = _configVersionDb.put(null, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error initialising config version: " + status); - } - return 0; - } - finally - { - closeCursorSafely(cursor); - } - } - - private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException - { - Cursor cursor = null; - try - { - cursor = _configuredObjectsDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); - crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); - } - - } - finally - { - closeCursorSafely(cursor); - } - } - - private void closeCursorSafely(Cursor cursor) - { - if (cursor != null) - { - cursor.close(); - } - } - - - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = _messageMetaDataDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws DatabaseException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - qerh.queueEntry(queueId, messageId); - } - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); - - cursor = null; - try - { - cursor = _xidDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); - } - - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - - - dtxrh.completeDtxRecordRecovery(); - } - - public void removeMessage(long messageId, boolean sync) throws StoreException - { - - boolean complete = false; - com.sleepycat.je.Transaction tx = null; - - Random rand = null; - int attempts = 0; - try - { - do - { - tx = null; - try - { - tx = _environment.beginTransaction(null, null); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removing message id " + messageId); - } - - - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted metadata for message " + messageId); - } - - //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - _messageContentDb.delete(tx, contentKeyEntry); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted content for message " + messageId); - } - - commit(tx, sync); - complete = true; - tx = null; - } - catch (LockConflictException e) - { - try - { - if(tx != null) - { - tx.abort(); - } - } - catch(DatabaseException e2) - { - LOGGER.warn("Unable to abort transaction after LockConflictExcption", e2); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw e; - } - - - LOGGER.warn("Lock timeout exception. Retrying (attempt " - + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); - - if(++attempts < LOCK_RETRY_ATTEMPTS) - { - if(rand == null) - { - rand = new Random(); - } - - try - { - Thread.sleep(500l + (long)(500l * rand.nextDouble())); - } - catch (InterruptedException e1) - { - - } - } - else - { - // rethrow the lock conflict exception since we could not solve by retrying - throw e; - } - } - } - while(!complete); - } - catch (DatabaseException e) - { - LOGGER.error("Unexpected BDB exception", e); - - if (tx != null) - { - try - { - tx.abort(); - tx = null; - } - catch (DatabaseException e1) - { - throw new StoreException("Error aborting transaction " + e1, e1); - } - } - - throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); - } - finally - { - if (tx != null) - { - try - { - tx.abort(); - tx = null; - } - catch (DatabaseException e1) - { - throw new StoreException("Error aborting transaction " + e1, e1); - } - } - } - } - - @Override - public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); - storeConfiguredObjectEntry(configuredObject); - } - } - - @Override - public void remove(UUID id, String type) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); - } - OperationStatus status = removeConfiguredObject(null, id); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Configured object of type " + type + " with id " + id + " not found"); - } - } - - @Override - public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - for(UUID id : objects) - { - if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS) - { - removed.add(id); - } - } - - txn.commit(); - return removed.toArray(new UUID[removed.size()]); - } - - @Override - public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - update(false, id, type, attributes, null); - } - - public void update(ConfiguredObjectRecord... records) throws StoreException - { - update(false, records); - } - - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - for(ConfiguredObjectRecord record : records) - { - update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn); - } - txn.commit(); - } - - private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws - StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Updating " +type + ", id: " + id); - } - - try - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(id, key); - - DatabaseEntry value = new DatabaseEntry(); - DatabaseEntry newValue = new DatabaseEntry(); - ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - - OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND)) - { - ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); - - // write the updated entry to the store - configuredObjectBinding.objectToEntry(newQueueRecord, newValue); - status = _configuredObjectsDb.put(txn, key, newValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error updating queue details within the store: " + status); - } - } - else if (status != OperationStatus.NOTFOUND) - { - throw new StoreException("Error finding queue details within the store: " + status); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error updating queue details within the store: " + e,e); - } - } - - /** - * Places a message onto a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The the queue to place the message on. - * @param messageId The message to enqueue. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); - keyBinding.objectToEntry(dd, key); - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " in transaction " + tx); - } - _deliveryDb.put(tx, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " to database", e); - } - } - - /** - * Extracts a message from a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The queue to take the message from. - * @param messageId The message to dequeue. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); - keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Dequeue message id " + messageId + " from queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - - try - { - - OperationStatus status = _deliveryDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removed message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id - + " from delivery db"); - - } - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e); - LOGGER.error(tx); - - throw new StoreException("Error accessing database while dequeuing message: " + e.getMessage(), e); - } - } - - - private void recordXid(com.sleepycat.je.Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - keyBinding.objectToEntry(xid,key); - - DatabaseEntry value = new DatabaseEntry(); - PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - valueBinding.objectToEntry(preparedTransaction, value); - - try - { - _xidDb.put(txn, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to write xid: " + e.getMessage(), e); - throw new StoreException("Error writing xid to database", e); - } - } - - private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) - throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - - keyBinding.objectToEntry(xid, key); - - - try - { - - OperationStatus status = _xidDb.delete(txn, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find xid"); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove xid"); - } - - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to remove xid ", e); - LOGGER.error(txn); - - throw new StoreException("Error accessing database while removing xid: " + e.getMessage(), e); - } - } - - /** - * Commits all operations performed within a given transaction. - * - * @param tx The transaction to commit all operations for. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws - StoreException - { - if (tx == null) - { - throw new StoreException("Fatal internal error: transactional is null at commitTran"); - } - - StoreFuture result; - try - { - result = commit(tx, syncCommit); - - if (LOGGER.isDebugEnabled()) - { - String transactionType = syncCommit ? "synchronous" : "asynchronous"; - LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error commit tx: " + e.getMessage(), e); - } - - return result; - } - - /** - * Abandons all operations performed within a given transaction. - * - * @param tx The transaction to abandon. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("abortTran called for transaction " + tx); - } - - try - { - tx.abort(); - } - catch (DatabaseException e) - { - throw new StoreException("Error aborting transaction: " + e.getMessage(), e); - } - } - - /** - * Primarily for testing purposes. - * - * @param queueId - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List<Long> getEnqueuedMessages(UUID queueId) throws StoreException - { - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueId, 0); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList<Long> messageIds = new LinkedList<Long>(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new StoreException("Database error: " + e.getMessage(), e); - } - finally - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch (DatabaseException e) - { - throw new StoreException("Error closing cursor: " + e.getMessage(), e); - } - } - } - } - - /** - * Return a valid, currently unused message id. - * - * @return A fresh message id. - */ - public long getNewMessageId() - { - return _messageId.incrementAndGet(); - } - - /** - * Stores a chunk of message data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param offset The offset of the data chunk in the message. - * @param contentBody The content of the data chunk. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, - ByteBuffer contentBody) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding messageBinding = ContentBinding.getInstance(); - messageBinding.objectToEntry(contentBody.array(), value); - try - { - OperationStatus status = _messageContentDb.put(tx, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error adding content for message id " + messageId + ": " + status); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); - - } - } - catch (DatabaseException e) - { - throw new StoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Stores message meta-data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param messageMetaData The message meta data to store. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, - StorableMessageMetaData messageMetaData) - throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("storeMetaData called for transaction " + tx - + ", messageId " + messageId - + ", messageMetaData " + messageMetaData); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - messageBinding.objectToEntry(messageMetaData, value); - try - { - _messageMetaDataDb.put(tx, key, value); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Retrieves message meta-data. - * - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " - + messageId + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - - try - { - OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Metadata not found for message with id " + messageId); - } - - StorableMessageMetaData mdd = messageBinding.entryToObject(value); - - return mdd; - } - catch (DatabaseException e) - { - throw new StoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); - } - } - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding contentTupleBinding = ContentBinding.getInstance(); - - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); - } - - try - { - - int written = 0; - OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - if (status == OperationStatus.SUCCESS) - { - byte[] dataAsBytes = contentTupleBinding.entryToObject(value); - int size = dataAsBytes.length; - if (offset > size) - { - throw new StoreException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); - - } - - written = size - offset; - if(written > dst.remaining()) - { - written = dst.remaining(); - } - - dst.put(dataAsBytes, offset, written); - } - return written; - } - catch (DatabaseException e) - { - throw new StoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - public boolean isPersistent() - { - return true; - } - - @SuppressWarnings("unchecked") - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - if(metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNewMessageId(), metaData); - } - } - - //Package getters for the various databases used by the Store - - Database getMetaDataDb() - { - return _messageMetaDataDb; - } - - Database getContentDb() - { - return _messageContentDb; - } - - Database getDeliveryDb() - { - return _deliveryDb; - } - - /** - * Makes the specified configured object persistent. - * - * @param configuredObject Details of the configured object to store. - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - LOGGER.debug("Storing configured object: " + configuredObject); - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(configuredObject.getId(), key); - - DatabaseEntry value = new DatabaseEntry(); - ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); - - queueBinding.objectToEntry(configuredObject, value); - try - { - OperationStatus status = _configuredObjectsDb.put(null, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " to database: " - + status); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error writing configured object " + configuredObject - + " to database: " + e.getMessage(), e); - } - } - } - - private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException - { - - LOGGER.debug("Removing configured object: " + id); - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(id, key); - try - { - return _configuredObjectsDb.delete(tx, key); - } - catch (DatabaseException e) - { - throw new StoreException("Error deleting of configured object with id " + id + " from database", e); - } - } - - protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException; - - - private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData> - { - - private final long _messageId; - private final boolean _isRecovered; - - private StorableMessageMetaData _metaData; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData) - { - this(messageId, metaData, false); - } - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) - { - _messageId = messageId; - _isRecovered = isRecovered; - - if(!_isRecovered) - { - _metaData = metaData; - } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - public StorableMessageMetaData getMetaData() - { - StorableMessageMetaData metaData = _metaDataRef.get(); - if(metaData == null) - { - metaData = AbstractBDBMessageStore.this.getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - return metaData; - } - - public long getMessageNumber() - { - return _messageId; - } - - public void addContent(int offsetInMessage, java.nio.ByteBuffer src) - { - src = src.slice(); - - if(_data == null) - { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); - } - else - { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); - } - - } - - public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else - { - return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } - } - - public ByteBuffer getContent(int offsetInMessage, int size) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else - { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; - } - } - - synchronized void store(com.sleepycat.je.Transaction txn) - { - if (!stored()) - { - try - { - _dataRef = new SoftReference<byte[]>(_data); - AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); - } - catch(DatabaseException e) - { - throw new StoreException(e); - } - finally - { - _metaData = null; - _data = null; - } - } - } - - public synchronized StoreFuture flushToStore() - { - if(!stored()) - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - store(txn); - AbstractBDBMessageStore.this.commit(txn,true); - storedSizeChange(getMetaData().getContentSize()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - public void remove() - { - int delta = getMetaData().getContentSize(); - AbstractBDBMessageStore.this.removeMessage(_messageId, false); - storedSizeChange(-delta); - } - - private boolean stored() - { - return _metaData == null || _isRecovered; - } - } - - private class BDBTransaction implements org.apache.qpid.server.store.Transaction - { - private com.sleepycat.je.Transaction _txn; - private int _storeSizeIncrease; - - private BDBTransaction() - { - try - { - _txn = _environment.beginTransaction(null, null); - } - catch (DatabaseException e) - { - LOGGER.error("Exception during transaction begin, closing store environment.", e); - closeEnvironmentSafely(); - - throw new StoreException("Exception during transaction begin, store environment closed.", e); - } - } - - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - if(message.getStoredMessage() instanceof StoredBDBMessage) - { - final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); - } - - AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); - } - - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); - } - - public void commitTran() - { - AbstractBDBMessageStore.this.commitTranImpl(_txn, true); - AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); - } - - public StoreFuture commitTranAsync() - { - AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); - return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); - } - - public void abortTran() - { - AbstractBDBMessageStore.this.abortTran(_txn); - } - - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); - - } - - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) - { - AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); - } - } - - private void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized (this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 2*delta; - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - _totalStoreSize = getSizeOnDisk(); - - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - _totalStoreSize = getSizeOnDisk(); - - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(); - - _totalStoreSize = getSizeOnDisk(); - - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - } - } - - private void reduceSizeOnDisk() - { - _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); - boolean cleaned = false; - while (_environment.cleanLog() > 0) - { - cleaned = true; - } - if (cleaned) - { - CheckpointConfig force = new CheckpointConfig(); - force.setForce(true); - _environment.checkpoint(force); - } - - - _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); - } - - private long getSizeOnDisk() - { - return _environment.getStats(null).getTotalLogSize(); - } - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - private void setEnvironmentConfigProperties(EnvironmentConfig envConfig) - { - for (Map.Entry<String, String> configItem : _envConfigMap.entrySet()) - { - LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); - } - } - - protected EnvironmentConfig createEnvironmentConfig() - { - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - - setEnvironmentConfigProperties(envConfig); - - envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - - return envConfig; - } - - protected void closeEnvironmentSafely() - { - try - { - _environment.close(); - } - catch (DatabaseException ex) - { - LOGGER.error("Exception closing store environment", ex); - } - catch (IllegalStateException ex) - { - LOGGER.error("Exception closing store environment", ex); - } - } - - - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" - + event.getThreadName() + "'", event.getException()); - } - } - - @Override - public void onDelete() - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store " + _storeLocation); - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - LOGGER.error("Cannot delete " + _storeLocation); - } - } - } - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java deleted file mode 100644 index d99733acf0..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ /dev/null @@ -1,665 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.TaskPrincipal; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.HAMessageStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.State; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; - -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Durability.ReplicaAckPolicy; -import com.sleepycat.je.Durability.SyncPolicy; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.OperationFailureException; -import com.sleepycat.je.Transaction; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.ReplicationMutableConfig; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; - -import javax.security.auth.Subject; - -public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore -{ - private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); - - private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); - - public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; - public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; - - @SuppressWarnings("serial") - private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() - {{ - /** - * Parameter decreased as the 24h default may lead very large log files for most users. - */ - put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); - /** - * Parameter increased as the 5 s default may lead to spurious timeouts. - */ - put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); - /** - * Parameter increased as the 10 s default may lead to spurious timeouts. - */ - put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); - /** - * Parameter increased as the 10 h default may cause user confusion. - */ - put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); - /** - * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False - * is scheduled to become default after JE 5.0.48. - */ - put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); - /** - * Parameter decreased as a default 5min interval may lead to bigger data losses on Node - * with NO_SYN durability in case if such Node crushes. - */ - put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); - }}); - - public static final String TYPE = "BDB-HA"; - - private String _groupName; - private String _nodeName; - private String _nodeHostPort; - private String _helperHostPort; - private Durability _durability; - - private String _name; - - private CommitThreadWrapper _commitThreadWrapper; - private boolean _coalescingSync; - private boolean _designatedPrimary; - private Map<String, String> _repConfig; - - @Override - public void configure(VirtualHost virtualHost) - { - //Mandatory configuration - _groupName = getValidatedStringAttribute(virtualHost, "haGroupName"); - _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName"); - _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress"); - _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress"); - _name = virtualHost.getName(); - - //Optional configuration - String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null); - if (durabilitySetting == null) - { - _durability = DEFAULT_DURABILITY; - } - else - { - _durability = Durability.parse(durabilitySetting); - } - _designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE); - _coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE); - - _repConfig = new HashMap<String, String>(REPCONFIG_DEFAULTS); - Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig"); - if(repConfigAttr instanceof Map) - { - _repConfig.putAll((Map)repConfigAttr); - } - - if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) - { - throw new StoreException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC - + "! Please set highAvailability.coalescingSync to false in store configuration."); - } - - super.configure(virtualHost); - } - - - private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - return attrValue.toString(); - } - else - { - throw new StoreException("BDB HA configuration key not found. Please specify configuration attribute: " - + attributeName); - } - } - - private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - return attrValue.toString(); - } - return defaultVal; - } - - private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - if(attrValue instanceof Boolean) - { - return ((Boolean) attrValue).booleanValue(); - } - else if(attrValue instanceof String) - { - return Boolean.parseBoolean((String)attrValue); - } - - } - return defaultVal; - } - - - @Override - protected void setupStore(File storePath, String name) throws DatabaseException - { - super.setupStore(storePath, name); - - if(_coalescingSync) - { - _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); - _commitThreadWrapper.startCommitThread(); - } - } - - @Override - protected Environment createEnvironment(File environmentPath) throws DatabaseException - { - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Environment path " + environmentPath.getAbsolutePath()); - LOGGER.info("Group name " + _groupName); - LOGGER.info("Node name " + _nodeName); - LOGGER.info("Node host port " + _nodeHostPort); - LOGGER.info("Helper host port " + _helperHostPort); - LOGGER.info("Durability " + _durability); - LOGGER.info("Coalescing sync " + _coalescingSync); - LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); - } - - final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); - - replicationConfig.setHelperHosts(_helperHostPort); - replicationConfig.setDesignatedPrimary(_designatedPrimary); - setReplicationConfigProperties(replicationConfig); - - final EnvironmentConfig envConfig = createEnvironmentConfig(); - envConfig.setDurability(_durability); - - ReplicatedEnvironment replicatedEnvironment = null; - try - { - replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); - } - catch (final InsufficientLogException ile) - { - LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); - NetworkRestore restore = new NetworkRestore(); - NetworkRestoreConfig config = new NetworkRestoreConfig(); - config.setRetainLogFiles(false); - restore.execute(ile, config); - replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); - } - - return replicatedEnvironment; - } - - @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) - { - super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); - - final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); - - replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); - } - - @Override - public synchronized void activate() - { - // Before proceeding, perform a log flush with an fsync - getEnvironment().flushLog(true); - - super.activate(); - } - - @Override - public synchronized void passivate() - { - if (_stateManager.isNotInState(State.INITIALISED)) - { - LOGGER.debug("Store becoming passive"); - _stateManager.attainState(State.INITIALISED); - } - } - - public String getName() - { - return _name; - } - - public String getGroupName() - { - return _groupName; - } - - public String getNodeName() - { - return _nodeName; - } - - public String getNodeHostPort() - { - return _nodeHostPort; - } - - public String getHelperHostPort() - { - return _helperHostPort; - } - - public String getDurability() - { - return _durability.toString(); - } - - public boolean isCoalescingSync() - { - return _coalescingSync; - } - - public String getNodeState() - { - ReplicatedEnvironment.State state = getReplicatedEnvironment().getState(); - return state.toString(); - } - - public Boolean isDesignatedPrimary() - { - return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary(); - } - - public List<Map<String, String>> getGroupMembers() - { - List<Map<String, String>> members = new ArrayList<Map<String,String>>(); - - for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes()) - { - Map<String, String> nodeMap = new HashMap<String, String>(); - nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName()); - nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); - members.add(nodeMap); - } - - return members; - } - - public void removeNodeFromGroup(String nodeName) throws StoreException - { - try - { - createReplicationGroupAdmin().removeMember(nodeName); - } - catch (OperationFailureException ofe) - { - throw new StoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); - } - catch (DatabaseException e) - { - throw new StoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); - } - } - - public void setDesignatedPrimary(boolean isPrimary) throws StoreException - { - try - { - final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); - synchronized(replicatedEnvironment) - { - final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig(); - final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); - replicatedEnvironment.setRepMutableConfig(newConfig); - } - - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); - } - } - catch (DatabaseException e) - { - throw new StoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e); - } - } - - ReplicatedEnvironment getReplicatedEnvironment() - { - return (ReplicatedEnvironment)getEnvironment(); - } - - public void updateAddress(String nodeName, String newHostName, int newPort) throws StoreException - { - try - { - createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); - } - catch (OperationFailureException ofe) - { - throw new StoreException("Failed to update address for '" + nodeName + - "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe); - } - catch (DatabaseException e) - { - throw new StoreException("Failed to update address for '" + nodeName + - "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e); - } - } - - @Override - protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException - { - // Using commit() instead of commitNoSync() for the HA store to allow - // the HA durability configuration to influence resulting behaviour. - try - { - tx.commit(); - } - catch (DatabaseException de) - { - LOGGER.error("Got DatabaseException on commit, closing environment", de); - - closeEnvironmentSafely(); - - throw de; - } - - if(_coalescingSync) - { - return _commitThreadWrapper.commit(tx, syncCommit); - } - else - { - return StoreFuture.IMMEDIATE_FUTURE; - } - } - - @Override - protected void closeInternal() - { - substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); - - try - { - if(_coalescingSync) - { - try - { - _commitThreadWrapper.stopCommitThread(); - } - catch (InterruptedException e) - { - throw new StoreException(e); - } - } - } - finally - { - super.closeInternal(); - } - } - - /** - * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during - * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change. - */ - private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) - { - LOGGER.debug("Substituting no-op state change listener for environment close"); - replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener()); - } - - private ReplicationGroupAdmin createReplicationGroupAdmin() - { - final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); - helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets()); - - final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig(); - helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); - - return new ReplicationGroupAdmin(_groupName, helpers); - } - - - private void setReplicationConfigProperties(ReplicationConfig replicationConfig) - { - for (Map.Entry<String, String> configItem : _repConfig.entrySet()) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - } - replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); - } - } - - private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException - { - if (!config.containsKey(key)) - { - throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " - + key.replace('.', '/')); - } - return config.getString(key); - } - - private class BDBHAMessageStoreStateChangeListener implements StateChangeListener - { - private final Executor _executor = Executors.newSingleThreadExecutor(); - - @Override - public void stateChange(StateChangeEvent stateChangeEvent) - { - com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); - - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Received BDB event indicating transition to state " + state); - } - - switch (state) - { - case MASTER: - activateStoreAsync(); - break; - case REPLICA: - passivateStoreAsync(); - break; - case DETACHED: - LOGGER.error("BDB replicated node in detached state, therefore passivating."); - passivateStoreAsync(); - break; - case UNKNOWN: - LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); - break; - default: - LOGGER.error("Unexpected state change: " + state); - throw new IllegalStateException("Unexpected state change: " + state); - } - } - - /** - * Calls {@link MessageStore#activate()}. - * - * <p/> - * - * This is done a background thread, in line with - * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because - * activate may execute transactions, which can't complete until - * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned. - */ - private void activateStoreAsync() - { - String threadName = "BDBHANodeActivationThread-" + _name; - executeStateChangeAsync(new Callable<Void>() - { - @Override - public Void call() throws Exception - { - try - { - activate(); - } - catch (Exception e) - { - LOGGER.error("Failed to activate on hearing MASTER change event",e); - throw e; - } - return null; - } - }, threadName); - } - - /** - * Calls {@link #passivate()}. - * - * <p/> - * This is done a background thread, in line with - * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because - * passivation due to the effect of state change listeners. - */ - private void passivateStoreAsync() - { - String threadName = "BDBHANodePassivationThread-" + _name; - executeStateChangeAsync(new Callable<Void>() - { - - @Override - public Void call() throws Exception - { - try - { - passivate(); - } - catch (Exception e) - { - LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); - throw e; - } - return null; - } - }, threadName); - } - - private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) - { - - _executor.execute(new Runnable() - { - - @Override - public void run() - { - final String originalThreadName = Thread.currentThread().getName(); - Thread.currentThread().setName(threadName); - - try - { - Subject.doAs(SecurityManager.getSystemTaskSubject("BDB HA State Change"), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - - try - { - callable.call(); - } - catch (Exception e) - { - LOGGER.error("Exception during state change", e); - } - return null; - } - }); - } - finally - { - Thread.currentThread().setName(originalThreadName); - } - } - }); - } - } - - private class NoOpStateChangeListener implements StateChangeListener - { - @Override - public void stateChange(StateChangeEvent stateChangeEvent) - { - } - } - - @Override - public String getStoreType() - { - return TYPE; - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index b055f8bd90..3fdc12ba31 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.store.berkeleydb; * */ +import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; @@ -31,15 +32,22 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + public class BDBHAVirtualHost extends AbstractVirtualHost { - private BDBHAMessageStore _messageStore; + private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class); + + private BDBMessageStore _messageStore; private boolean _inVhostInitiatedClose; @@ -52,11 +60,9 @@ public class BDBHAVirtualHost extends AbstractVirtualHost super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost); } - - protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) { - _messageStore = new BDBHAMessageStore(); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); @@ -84,6 +90,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost virtualHost, recoveryHandler, recoveryHandler ); + + // Make the virtualhost model object a replication group listener + ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade(); + environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + } @@ -194,4 +205,70 @@ public class BDBHAVirtualHost extends AbstractVirtualHost } } + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener + { + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Received BDB event indicating transition to state " + state + + " when current message store state is " + _messageStore._stateManager.getState()); + } + + switch (state) + { + case MASTER: + activate(); + break; + case REPLICA: + passivate(); + break; + case DETACHED: + LOGGER.error("BDB replicated node in detached state, therefore passivating."); + passivate(); + break; + case UNKNOWN: + LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); + break; + default: + LOGGER.error("Unexpected state change: " + state); + throw new IllegalStateException("Unexpected state change: " + state); + } + } + + private void activate() + { + try + { + _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); + _messageStore.activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event", e); + } + } + + private void passivate() + { + try + { + //TODO: move this this into the store method passivate() + if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) + { + _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); + } + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e); + } + } + + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index acff8e2b21..35dae4b800 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -20,16 +20,44 @@ */ package org.apache.qpid.server.store.berkeleydb; +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.*; +import com.sleepycat.je.Transaction; + import java.io.File; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; - -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; +import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.entry.Xid; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.util.FileUtils; /** * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. @@ -39,83 +67,1623 @@ import com.sleepycat.je.EnvironmentConfig; * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and * dequeue messages to queues. <tr><td> Generate message identifiers. </table> */ -public class BDBMessageStore extends AbstractBDBMessageStore +public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - public static final String TYPE = "BDB"; - private CommitThreadWrapper _commitThreadWrapper; + + public static final int VERSION = 7; + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; + + private static final int LOCK_RETRY_ATTEMPTS = 5; + private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; + private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; + private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + private static String BRIDGEDB_NAME = "BRIDGES"; + private static String LINKDB_NAME = "LINKS"; + private static String XID_DB_NAME = "XIDS"; + private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION"; + private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME, + MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME }; + + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private EnvironmentFacade _environmentFacade; + private final AtomicLong _messageId = new AtomicLong(0); + + protected final StateManager _stateManager; + + private MessageStoreRecoveryHandler _messageRecoveryHandler; + + private TransactionLogRecoveryHandler _tlogRecoveryHandler; + + private ConfigurationRecoveryHandler _configRecoveryHandler; + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + private final EventManager _eventManager = new EventManager(); + private final String _type; + private VirtualHost _virtualHost; + + private final EnvironmentFacadeFactory _environmentFacadeFactory; + + private volatile Committer _committer; + + public BDBMessageStore() + { + this(new StandardEnvironmentFacadeFactory()); + } + + public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) + { + _type = environmentFacadeFactory.getType();; + _environmentFacadeFactory = environmentFacadeFactory; + _stateManager = new StateManager(_eventManager); + } @Override - protected void setupStore(File storePath, String name) throws DatabaseException + public void addEventListener(EventListener eventListener, Event... events) { - super.setupStore(storePath, name); + _eventManager.addEventListener(eventListener, events); + } - _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); - _commitThreadWrapper.startCommitThread(); + @Override + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + { + _stateManager.attainState(State.INITIALISING); + + _configRecoveryHandler = recoveryHandler; + _virtualHost = virtualHost; + } + + @Override + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException + { + if(_stateManager.isInState(State.INITIAL)) + { + // Is acting as a message store, but not a durable config store + _stateManager.attainState(State.INITIALISING); + } + + _messageRecoveryHandler = messageRecoveryHandler; + _tlogRecoveryHandler = tlogRecoveryHandler; + _virtualHost = virtualHost; + + + completeInitialisation(); } - protected Environment createEnvironment(File environmentPath) throws DatabaseException + private void completeInitialisation() throws StoreException { - LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = createEnvironmentConfig(); + configure(_virtualHost, _messageRecoveryHandler != null); + _stateManager.attainState(State.INITIALISED); + } + + private void startActivation() throws StoreException + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); try { - return new Environment(environmentPath, envConfig); + new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary(); + _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES); + _totalStoreSize = getSizeOnDisk(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot configure store", e); } - catch (DatabaseException de) + + } + + @Override + public synchronized void activate() throws StoreException + { + // check if acting as a durable config store, but not a message store + if(_stateManager.isInState(State.INITIALISING)) { - if (de.getMessage().contains("Environment.setAllowCreate is false")) + completeInitialisation(); + } + + _stateManager.attainState(State.ACTIVATING); + startActivation(); + + if(_configRecoveryHandler != null) + { + recoverConfig(_configRecoveryHandler); + } + if(_messageRecoveryHandler != null) + { + recoverMessages(_messageRecoveryHandler); + } + if(_tlogRecoveryHandler != null) + { + recoverQueueEntries(_tlogRecoveryHandler); + } + + _stateManager.attainState(State.ACTIVE); + } + + @Override + public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException + { + return new BDBTransaction(); + } + + private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException + { + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore); + + _committer = _environmentFacade.createCommitter(virtualHost.getName()); + _committer.start(); + } + + @Override + public String getStoreLocation() + { + if (_environmentFacade == null) + { + return null; + } + return _environmentFacade.getStoreLocation(); + } + + public EnvironmentFacade getEnvironmentFacade() + { + return _environmentFacade; + } + + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + @Override + public void close() throws StoreException + { + if (_closed.compareAndSet(false, true)) + { + _stateManager.attainState(State.CLOSING); + try { - //Allow the creation this time - envConfig.setAllowCreate(true); - return new Environment(environmentPath, envConfig); + try + { + _committer.stop(); + } + finally + { + closeEnvironment(); + } } - else + catch(DatabaseException e) + { + throw new StoreException("Exception occured on message store close", e); + } + _stateManager.attainState(State.CLOSED); + } + } + + private void closeEnvironment() + { + if (_environmentFacade != null) + { + _environmentFacade.close(); + } + } + + private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException + { + try + { + final int configVersion = getConfigVersion(); + recoveryHandler.beginConfigurationRecovery(this, configVersion); + loadConfiguredObjects(recoveryHandler); + + final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e); + } + + } + + @SuppressWarnings("resource") + private void updateConfigVersion(int newConfigVersion) throws StoreException + { + Cursor cursor = null; + try + { + Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + cursor = getConfigVersionDb().openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0,key); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + IntegerBinding.intToEntry(newConfigVersion, value); + OperationStatus status = cursor.put(key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error setting config version: " + status); + } + } + cursor.close(); + cursor = null; + txn.commit(); + } + finally + { + closeCursorSafely(cursor); + } + + } + + private int getConfigVersion() throws StoreException + { + Cursor cursor = null; + try + { + cursor = getConfigVersionDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + return IntegerBinding.entryToInt(value); + } + + // Insert 0 as the default config version + IntegerBinding.intToEntry(0,value); + ByteBinding.byteToEntry((byte) 0,key); + OperationStatus status = getConfigVersionDb().put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error initialising config version: " + status); + } + return 0; + } + finally + { + closeCursorSafely(cursor); + } + } + + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException + { + Cursor cursor = null; + try + { + cursor = getConfiguredObjectsDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); + crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); + } + + } + finally + { + closeCursorSafely(cursor); + } + } + + private void closeCursorSafely(Cursor cursor) throws StoreException + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + + + private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException + { + StoredMessageRecoveryHandler mrh = msrh.begin(); + + Cursor cursor = null; + try + { + cursor = getMessageMetaDataDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + + long maxId = 0; + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = LongBinding.entryToLong(key); + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + + mrh.message(message); + + maxId = Math.max(maxId, messageId); + } + + _messageId.set(maxId); + mrh.completeMessageRecovery(); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot recover messages", e); + } + finally + { + closeCursorSafely(cursor); + } + } + + private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) + throws StoreException + { + QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); + + ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + + Cursor cursor = null; + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + QueueEntryKey qek = keyBinding.entryToObject(key); + + entries.add(qek); + } + + try { - throw de; + cursor.close(); + } + finally + { + cursor = null; + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + qerh.queueEntry(queueId, messageId); } } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e); + } + finally + { + closeCursorSafely(cursor); + } + + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); + + cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), + preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); + } + + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e); + } + finally + { + closeCursorSafely(cursor); + } + + + dtxrh.completeDtxRecordRecovery(); + } + + public void removeMessage(long messageId, boolean sync) throws StoreException + { + + boolean complete = false; + com.sleepycat.je.Transaction tx = null; + + Random rand = null; + int attempts = 0; + try + { + do + { + tx = null; + try + { + tx = _environmentFacade.getEnvironment().beginTransaction(null, null); + + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removing message id " + messageId); + } + + + OperationStatus status = getMessageMetaDataDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleted metadata for message " + messageId); + } + + //now remove the content data from the store if there is any. + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + getMessageContentDb().delete(tx, contentKeyEntry); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleted content for message " + messageId); + } + + _environmentFacade.commit(tx); + _committer.commit(tx, sync); + + complete = true; + tx = null; + } + catch (LockConflictException e) + { + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e); + } + + + LOGGER.warn("Lock timeout exception. Retrying (attempt " + + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) + { + if(rand == null) + { + rand = new Random(); + } + + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { + + } + } + else + { + // rethrow the lock conflict exception since we could not solve by retrying + throw _environmentFacade.handleDatabaseException("Cannot remove messages", e); + } + } + } + while(!complete); + } + catch (DatabaseException e) + { + LOGGER.error("Unexpected BDB exception", e); + + try + { + abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); + } + finally + { + tx = null; + } + + throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); + } + finally + { + try + { + abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); + } + finally + { + tx = null; + } + } + } + + private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx) + { + try + { + if (tx != null) + { + tx.abort(); + } + } + catch (DatabaseException e1) + { + // We need the possible side effect of the handler restarting the environment but don't care about the exception + _environmentFacade.handleDatabaseException(null, e1); + LOGGER.warn(errorMessage, e1); + } } @Override - protected void closeInternal() + public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); + storeConfiguredObjectEntry(configuredObject); + } + } + + @Override + public void remove(UUID id, String type) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); + } + OperationStatus status = removeConfiguredObject(null, id); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Configured object of type " + type + " with id " + id + " not found"); + } + } + + @Override + public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException + { + com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + for(UUID id : objects) + { + if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS) + { + removed.add(id); + } + } + commitTransaction(txn); + return removed.toArray(new UUID[removed.size()]); + } + + private void commitTransaction(com.sleepycat.je.Transaction txn) throws StoreException + { + try + { + txn.commit(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot commit transaction on configured objects removal", e); + } + } + + @Override + public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException + { + update(false, id, type, attributes, null); + } + + @Override + public void update(ConfiguredObjectRecord... records) throws StoreException + { + update(false, records); + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException + { + com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn); + } + commitTransaction(txn); + } + + private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Updating " +type + ", id: " + id); + } + + try + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(id, key); + + DatabaseEntry value = new DatabaseEntry(); + DatabaseEntry newValue = new DatabaseEntry(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + + OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT); + if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND)) + { + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); + + // write the updated entry to the store + configuredObjectBinding.objectToEntry(newQueueRecord, newValue); + status = getConfiguredObjectsDb().put(txn, key, newValue); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error updating configuration details within the store: " + status); + } + } + else if (status != OperationStatus.NOTFOUND) + { + throw new StoreException("Error finding configuration details within the store: " + status); + } + } + catch (DatabaseException e) + { + if (txn != null) + { + abortTransactionIgnoringException("Error updating configuration details within the store: " + e.getMessage(), txn); + } + throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); + } + } + + /** + * Places a message onto a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws StoreException If the operation fails for any reason. + */ + public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); + keyBinding.objectToEntry(dd, key); + DatabaseEntry value = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0, value); + + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Enqueuing message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " in transaction " + tx); + } + getDeliveryDb().put(tx, key, value); + } + catch (DatabaseException e) + { + LOGGER.error("Failed to enqueue: " + e.getMessage(), e); + throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " to database", e); + } + } + + /** + * Extracts a message from a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); + UUID id = queue.getId(); + keyBinding.objectToEntry(queueEntryKey, key); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Dequeue message id " + messageId + " from queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + } + + try + { + + OperationStatus status = getDeliveryDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find message with id " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove message with id " + messageId + " on queue" + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removed message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id + + " from delivery db"); + + } + } + catch (DatabaseException e) + { + + LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e); + + throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e); + } + } + + + private void recordXid(com.sleepycat.je.Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + keyBinding.objectToEntry(xid,key); + + DatabaseEntry value = new DatabaseEntry(); + PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + valueBinding.objectToEntry(preparedTransaction, value); + try { - _commitThreadWrapper.stopCommitThread(); + getXidDb().put(txn, key, value); + } + catch (DatabaseException e) + { + LOGGER.error("Failed to write xid: " + e.getMessage(), e); + throw _environmentFacade.handleDatabaseException("Error writing xid to database", e); } - catch (InterruptedException e) + } + + private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) + throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + + keyBinding.objectToEntry(xid, key); + + + try + { + + OperationStatus status = getXidDb().delete(txn, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find xid"); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove xid"); + } + + } + catch (DatabaseException e) + { + + LOGGER.error("Failed to remove xid in transaction " + txn, e); + + throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e); + } + } + + /** + * Commits all operations performed within a given transaction. + * + * @param tx The transaction to commit all operations for. + * + * @throws StoreException If the operation fails for any reason. + */ + private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException + { + if (tx == null) + { + throw new StoreException("Fatal internal error: transactional is null at commitTran"); + } + + _environmentFacade.commit(tx); + StoreFuture result = _committer.commit(tx, syncCommit); + + if (LOGGER.isDebugEnabled()) { - throw new StoreException(e); + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); } - super.closeInternal(); + return result; + } + + /** + * Abandons all operations performed within a given transaction. + * + * @param tx The transaction to abandon. + * + * @throws StoreException If the operation fails for any reason. + */ + public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("abortTran called for transaction " + tx); + } + + try + { + tx.abort(); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); + } + } + + /** + * Primarily for testing purposes. + * + * @param queueId + * + * @return a list of message ids for messages enqueued for a particular queue + */ + List<Long> getEnqueuedMessages(UUID queueId) throws StoreException + { + Cursor cursor = null; + try + { + cursor = getDeliveryDb().openCursor(null, null); + + DatabaseEntry key = new DatabaseEntry(); + + QueueEntryKey dd = new QueueEntryKey(queueId, 0); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(dd, key); + + DatabaseEntry value = new DatabaseEntry(); + + LinkedList<Long> messageIds = new LinkedList<Long>(); + + OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); + dd = keyBinding.entryToObject(key); + + while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) + { + + messageIds.add(dd.getMessageId()); + status = cursor.getNext(key, value, LockMode.DEFAULT); + if (status == OperationStatus.SUCCESS) + { + dd = keyBinding.entryToObject(key); + } + } + + return messageIds; + } + catch (DatabaseException e) + { + throw new StoreException("Database error: " + e.getMessage(), e); + } + finally + { + closeCursorSafely(cursor); + } + } + + /** + * Return a valid, currently unused message id. + * + * @return A fresh message id. + */ + public long getNewMessageId() + { + return _messageId.incrementAndGet(); + } + + /** + * Stores a chunk of message data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param offset The offset of the data chunk in the message. + * @param contentBody The content of the data chunk. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, + ByteBuffer contentBody) throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding messageBinding = ContentBinding.getInstance(); + messageBinding.objectToEntry(contentBody.array(), value); + try + { + OperationStatus status = getMessageContentDb().put(tx, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error adding content for message id " + messageId + ": " + status); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); + + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + /** + * Stores message meta-data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, + StorableMessageMetaData messageMetaData) + throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + messageBinding.objectToEntry(messageMetaData, value); + try + { + getMessageMetaDataDb().put(tx, key, value); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + /** + * Retrieves message meta-data. + * + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " + + messageId + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + + try + { + OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Metadata not found for message with id " + messageId); + } + + StorableMessageMetaData mdd = messageBinding.entryToObject(value); + + return mdd; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); + } + } + + /** + * Fills the provided ByteBuffer with as much content for the specified message as possible, starting + * from the specified offset in the message. + * + * @param messageId The message to get the data for. + * @param offset The offset of the data within the message. + * @param dst The destination of the content read back + * + * @return The number of bytes inserted into the destination + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + byte[] dataAsBytes = contentTupleBinding.entryToObject(value); + int size = dataAsBytes.length; + if (offset > size) + { + throw new RuntimeException("Offset " + offset + " is greater than message size " + size + + " for message id " + messageId + "!"); + + } + + written = size - offset; + if(written > dst.remaining()) + { + written = dst.remaining(); + } + + dst.put(dataAsBytes, offset, written); + } + return written; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + @Override + public boolean isPersistent() + { + return true; } @Override - protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException + @SuppressWarnings("unchecked") + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + { + if(metaData.isPersistent()) + { + return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); + } + else + { + return new StoredMemoryMessage(getNewMessageId(), metaData); + } + } + + /** + * Makes the specified configured object persistent. + * + * @param configuredObject Details of the configured object to store. + * @throws StoreException If the operation fails for any reason. + */ + private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + LOGGER.debug("Storing configured object: " + configuredObject); + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(configuredObject.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = getConfiguredObjectsDb().put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " to database: " + + status); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); + } + } + } + + private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException + { + + LOGGER.debug("Removing configured object: " + id); + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(id, key); + try + { + return getConfiguredObjectsDb().delete(tx, key); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error deleting of configured object with id " + id + " from database", e); + } + } + + + + private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData> + { + + private final long _messageId; + private final boolean _isRecovered; + + private StorableMessageMetaData _metaData; + private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; + + StoredBDBMessage(long messageId, StorableMessageMetaData metaData) + { + this(messageId, metaData, false); + } + + StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) + { + _messageId = messageId; + _isRecovered = isRecovered; + + if(!_isRecovered) + { + _metaData = metaData; + } + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + } + + public StorableMessageMetaData getMetaData() + { + StorableMessageMetaData metaData = _metaDataRef.get(); + if(metaData == null) + { + metaData = BDBMessageStore.this.getMessageMetaData(_messageId); + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + } + + return metaData; + } + + public long getMessageNumber() + { + return _messageId; + } + + public void addContent(int offsetInMessage, java.nio.ByteBuffer src) + { + src = src.slice(); + + if(_data == null) + { + _data = new byte[src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + src.duplicate().get(_data); + } + else + { + byte[] oldData = _data; + _data = new byte[oldData.length + src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + + System.arraycopy(oldData,0,_data,0,oldData.length); + src.duplicate().get(_data, oldData.length, src.remaining()); + } + + } + + public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } + else + { + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + } + + public ByteBuffer getContent(int offsetInMessage, int size) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + return ByteBuffer.wrap(data,offsetInMessage,size); + } + else + { + ByteBuffer buf = ByteBuffer.allocate(size); + int length = getContent(offsetInMessage, buf); + buf.limit(length); + buf.position(0); + return buf; + } + } + + synchronized void store(com.sleepycat.je.Transaction txn) + { + if (!stored()) + { + try + { + _dataRef = new SoftReference<byte[]>(_data); + BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); + BDBMessageStore.this.addContent(txn, _messageId, 0, + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + } + finally + { + _metaData = null; + _data = null; + } + } + } + + public synchronized StoreFuture flushToStore() + { + if(!stored()) + { + com.sleepycat.je.Transaction txn; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction( + null, null); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("failed to begin transaction", e); + } + store(txn); + _environmentFacade.commit(txn); + _committer.commit(txn, true); + + storedSizeChangeOccured(getMetaData().getContentSize()); + } + return StoreFuture.IMMEDIATE_FUTURE; + } + + public void remove() + { + int delta = getMetaData().getContentSize(); + BDBMessageStore.this.removeMessage(_messageId, false); + storedSizeChangeOccured(-delta); + } + + private boolean stored() + { + return _metaData == null || _isRecovered; + } + } + + private class BDBTransaction implements org.apache.qpid.server.store.Transaction + { + private com.sleepycat.je.Transaction _txn; + private int _storeSizeIncrease; + + private BDBTransaction() throws StoreException + { + try + { + _txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e); + } + } + + public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + if(message.getStoredMessage() instanceof StoredBDBMessage) + { + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + + BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + } + + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + } + + public void commitTran() throws StoreException + { + BDBMessageStore.this.commitTranImpl(_txn, true); + BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); + } + + public StoreFuture commitTranAsync() throws StoreException + { + BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); + return BDBMessageStore.this.commitTranImpl(_txn, false); + } + + public void abortTran() throws StoreException + { + BDBMessageStore.this.abortTran(_txn); + } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + { + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, + Record[] dequeues) throws StoreException + { + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + } + } + + private void storedSizeChangeOccured(final int delta) throws StoreException { try { - tx.commitNoSync(); + storedSizeChange(delta); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Stored size change exception", e); } - catch(DatabaseException de) + } + + private void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) { - LOGGER.error("Got DatabaseException on commit, closing environment", de); + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); - closeEnvironmentSafely(); + _totalStoreSize = getSizeOnDisk(); - throw de; + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } + } + + private void reduceSizeOnDisk() + { + _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (_environmentFacade.getEnvironment().cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + _environmentFacade.getEnvironment().checkpoint(force); } - return _commitThreadWrapper.commit(tx, syncCommit); + + _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize(); + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + + @Override + public void onDelete() + { + String storeLocation = getStoreLocation(); + + if (storeLocation != null) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + storeLocation); + } + + File location = new File(storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + LOGGER.error("Cannot delete " + storeLocation); + } + } + } } @Override public String getStoreType() { - return TYPE; + return _type; + } + + private Database getMessageContentDb() + { + return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); + } + + private Database getConfiguredObjectsDb() + { + return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); + } + + private Database getConfigVersionDb() + { + return _environmentFacade.getOpenDatabase(CONFIG_VERSION_DB_NAME); + } + + private Database getMessageMetaDataDb() + { + return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME); + } + + private Database getDeliveryDb() + { + return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME); + } + + private Database getXidDb() + { + return _environmentFacade.getOpenDatabase(XID_DB_NAME); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index d7c8b23d39..4abe81c56c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; @@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public String getType() { - return BDBMessageStore.TYPE; + return StandardEnvironmentFacade.TYPE; } @Override @@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi if(initialSize != 0) { - return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes); + return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes); } else { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java new file mode 100644 index 0000000000..a137e38baf --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -0,0 +1,313 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreFuture; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; + +public class CoalescingCommiter implements Committer +{ + private final CommitThread _commitThread; + + public CoalescingCommiter(String name, EnvironmentFacade environmentFacade) + { + _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade); + } + + @Override + public void start() + { + _commitThread.start(); + } + + @Override + public void stop() + { + _commitThread.close(); + try + { + _commitThread.join(); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Commit thread has not shutdown", ie); + } + } + + @Override + public StoreFuture commit(Transaction tx, boolean syncCommit) + { + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + return commitFuture; + } + + private static final class BDBCommitFuture implements StoreFuture + { + private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + + private final CommitThread _commitThread; + private final Transaction _tx; + private final boolean _syncCommit; + private RuntimeException _databaseException; + private boolean _complete; + + public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + { + _commitThread = commitThread; + _tx = tx; + _syncCommit = syncCommit; + } + + public synchronized void complete() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("complete() called for transaction " + _tx); + } + _complete = true; + + notifyAll(); + } + + public synchronized void abort(RuntimeException databaseException) + { + _complete = true; + _databaseException = databaseException; + + notifyAll(); + } + + public void commit() throws DatabaseException + { + _commitThread.addJob(this, _syncCommit); + + if(!_syncCommit) + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + } + return; + } + + waitForCompletion(); + + if (_databaseException != null) + { + throw _databaseException; + } + + } + + public synchronized boolean isComplete() + { + return _complete; + } + + public synchronized void waitForCompletion() + { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + + while (!isComplete()) + { + _commitThread.explicitNotify(); + try + { + wait(250); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } + } + + /** + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before + * continuing, but it is the responsibility of this thread to tell the commit operations when they have been + * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> + */ + private static class CommitThread extends Thread + { + private static final Logger LOGGER = Logger.getLogger(CommitThread.class); + + private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Object _lock = new Object(); + private final EnvironmentFacade _environmentFacade; + + public CommitThread(String name, EnvironmentFacade environmentFacade) + { + super(name); + _environmentFacade = environmentFacade; + } + + public void explicitNotify() + { + synchronized (_lock) + { + _lock.notify(); + } + } + + public void run() + { + while (!_stopped.get()) + { + synchronized (_lock) + { + while (!_stopped.get() && !hasJobs()) + { + try + { + // Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) + { + } + } + } + processJobs(); + } + } + + private void processJobs() + { + int size = _jobQueue.size(); + + try + { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + + Environment environment = _environmentFacade.getEnvironment(); + if (environment != null && environment.isValid()) + { + environment.flushLog(true); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("flushLog completed in " + duration + " ms"); + } + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.complete(); + } + + } + catch (DatabaseException e) + { + try + { + LOGGER.error("Exception during environment log flush", e); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.abort(e); + } + } + finally + { + LOGGER.error("Closing store environment", e); + + try + { + _environmentFacade.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + } + } + } + + private boolean hasJobs() + { + return !_jobQueue.isEmpty(); + } + + public void addJob(BDBCommitFuture commit, final boolean sync) + { + if (_stopped.get()) + { + throw new IllegalStateException("Commit thread is stopped"); + } + _jobQueue.add(commit); + if(sync) + { + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + + public void close() + { + RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted"); + synchronized (_lock) + { + _stopped.set(true); + BDBCommitFuture commit = null; + while ((commit = _jobQueue.poll()) != null) + { + commit.abort(e); + } + _lock.notifyAll(); + } + } + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 7f7b65f315..36ee2ad306 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,27 +20,36 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.server.store.StoreFuture; -import static org.mockito.Mockito.mock; +import com.sleepycat.je.Transaction; -public class HAMessageStoreSmokeTest extends QpidTestCase +public interface Committer { - private final BDBHAMessageStore _store = new BDBHAMessageStore(); + void start(); + + StoreFuture commit(Transaction tx, boolean syncCommit); + + void stop(); - public void testMissingHAConfigThrowsException() throws Exception + Committer IMMEDIATE_FUTURE_COMMITTER = new Committer() { - try + + @Override + public void start() { - _store.configure(mock(VirtualHost.class)); - fail("Expected an exception to be thrown"); } - catch (ServerScopedRuntimeException ce) + + @Override + public StoreFuture commit(Transaction tx, boolean syncCommit) { - assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); + return StoreFuture.IMMEDIATE_FUTURE; } - } -} + + @Override + public void stop() + { + } + }; + +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java new file mode 100644 index 0000000000..144ab83238 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; + +public interface EnvironmentFacade +{ + @SuppressWarnings("serial") + final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); + // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84 + put(EnvironmentConfig.STATS_COLLECT, "false"); + }}); + + Environment getEnvironment(); + + Committer createCommitter(String name); + + void openDatabases(DatabaseConfig dbConfig, String... databaseNames); + + Database getOpenDatabase(String name); + + void commit(com.sleepycat.je.Transaction tx); + + DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); + + String getStoreLocation(); + + void close(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index 59483751ca..b784e436b9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,13 +18,15 @@ * under the License. * */ -package org.apache.qpid.server.store; +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.model.VirtualHost; -public interface HAMessageStore extends MessageStore +public interface EnvironmentFacadeFactory { - /** - * Used to indicate that a store requires to make itself unavailable for read and read/write - * operations. - */ - void passivate(); + + EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + + String getType(); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java new file mode 100644 index 0000000000..b13766a136 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.log4j.Logger; + +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.ExceptionListener; + +public class LoggingAsyncExceptionListener implements ExceptionListener +{ + private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class); + + @Override + public void exceptionThrown(ExceptionEvent event) + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java new file mode 100644 index 0000000000..8117ca1a9a --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -0,0 +1,228 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Logger; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; + +public class StandardEnvironmentFacade implements EnvironmentFacade +{ + private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class); + public static final String TYPE = "BDB"; + + private final String _storePath; + private final Map<String, Database> _databases = new HashMap<String, Database>(); + + private Environment _environment; + + public StandardEnvironmentFacade(String storePath, Map<String, String> attributes) + { + _storePath = storePath; + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Creating environment at environment path " + _storePath); + } + + File environmentPath = new File(storePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + + for (Map.Entry<String, String> configItem : attributes.entrySet()) + { + LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + + envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + + _environment = new Environment(environmentPath, envConfig); + } + + @Override + public void commit(com.sleepycat.je.Transaction tx) + { + try + { + tx.commitNoSync(); + } + catch (DatabaseException de) + { + LOGGER.error("Got DatabaseException on commit, closing environment", de); + + closeEnvironmentSafely(); + + throw handleDatabaseException("Got DatabaseException on commit", de); + } + } + + @Override + public void close() + { + closeDatabases(); + closeEnvironment(); + } + + private void closeDatabases() + { + RuntimeException firstThrownException = null; + for (Database database : _databases.values()) + { + try + { + database.close(); + } + catch(RuntimeException e) + { + if (firstThrownException == null) + { + firstThrownException = e; + } + } + } + if (firstThrownException != null) + { + throw firstThrownException; + } + } + + private void closeEnvironmentSafely() + { + if (_environment != null) + { + if (_environment.isValid()) + { + try + { + closeDatabases(); + } + catch(Exception e) + { + LOGGER.error("Exception closing environment databases", e); + } + } + try + { + _environment.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + catch (IllegalStateException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + finally + { + _environment = null; + } + } + } + + @Override + public Environment getEnvironment() + { + return _environment; + } + + private void closeEnvironment() + { + if (_environment != null) + { + // Clean the log before closing. This makes sure it doesn't contain + // redundant data. Closing without doing this means the cleaner may + // not get a chance to finish. + try + { + _environment.cleanLog(); + } + finally + { + _environment.close(); + _environment = null; + } + } + } + + @Override + public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e) + { + if (_environment != null && !_environment.isValid()) + { + closeEnvironmentSafely(); + } + return e; + } + + @Override + public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) + { + for (String databaseName : databaseNames) + { + Database database = _environment.openDatabase(null, databaseName, dbConfig); + _databases .put(databaseName, database); + } + } + + @Override + public Database getOpenDatabase(String name) + { + Database database = _databases.get(name); + if (database == null) + { + throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); + } + return database; + } + + @Override + public Committer createCommitter(String name) + { + return new CoalescingCommiter(name, this); + } + + @Override + public String getStoreLocation() + { + return _storePath; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java new file mode 100644 index 0000000000..384ceba98a --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.model.VirtualHost; + +public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory +{ + + @SuppressWarnings("unchecked") + @Override + public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + { + Map<String, String> envConfigMap = new HashMap<String, String>(); + envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); + + Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + if (environmentConfigurationAttributes instanceof Map) + { + envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); + } + + String name = virtualHost.getName(); + final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; + + String storeLocation; + if(isMessageStore) + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + else // we are acting only as the durable config store + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + + return new StandardEnvironmentFacade(storeLocation, envConfigMap); + } + + @Override + public String getType() + { + return StandardEnvironmentFacade.TYPE; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java new file mode 100644 index 0000000000..38fdf34196 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; + +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Transaction; + +public class DatabasePinger +{ + public static final String PING_DATABASE_NAME = "PINGDB"; + private static final int ID = 0; + + public void pingDb(EnvironmentFacade facade) + { + try + { + final Database db = facade.getOpenDatabase(PING_DATABASE_NAME); + + DatabaseEntry key = new DatabaseEntry(); + IntegerBinding.intToEntry(ID, key); + + DatabaseEntry value = new DatabaseEntry(); + LongBinding.longToEntry(System.currentTimeMillis(), value); + Transaction txn = null; + try + { + txn = facade.getEnvironment().beginTransaction(null, null); + db.put(txn, key, value); + txn.commit(); + txn = null; + } + finally + { + try + { + if (txn != null) + { + txn.abort(); + } + } + finally + { + db.close(); + } + } + } + catch (DatabaseException de) + { + facade.handleDatabaseException("DatabaseException from DatabasePinger ", de); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java new file mode 100644 index 0000000000..76a48c189e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Map; + +public interface ReplicatedEnvironmentConfiguration +{ + String getName(); + String getGroupName(); + String getHostPort(); + String getHelperHostPort(); + String getDurability(); + boolean isCoalescingSync(); + boolean isDesignatedPrimary(); + int getPriority(); + int getQuorumOverride(); + String getStorePath(); + Map<String, String> getParameters(); + Map<String, String> getReplicationParameters(); +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java new file mode 100644 index 0000000000..02181611c2 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -0,0 +1,1052 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; +import org.apache.qpid.server.store.berkeleydb.Committer; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; +import org.apache.qpid.server.util.DaemonThreadFactory; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.EnvironmentFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.RepInternal; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.RestartRequiredException; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.DbPing; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; +import com.sleepycat.je.rep.vlsn.VLSNRange; +import com.sleepycat.je.utilint.PropUtil; +import com.sleepycat.je.utilint.VLSN; + +public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener +{ + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; + public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; + + private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); + + private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; + private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000; + + private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); + private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); + + @SuppressWarnings("serial") + private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + /** + * Parameter decreased as the 24h default may lead very large log files for most users. + */ + put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); + /** + * Parameter increased as the 5 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); + /** + * Parameter increased as the 10 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); + /** + * Parameter decreased as the 10 h default may cause user confusion. + */ + put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); + /** + * Parameter changed from default (off) to allow the Environment to start in the + * UNKNOWN state when the majority is not available. + */ + put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); + /** + * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False + * is scheduled to become default after JE 5.1. + */ + put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); + /** + * Parameter decreased as a default 5min interval may lead to bigger data losses on Node + * with NO_SYN durability in case if such Node crushes. + */ + put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + }}); + + public static final String TYPE = "BDB-HA"; + + // TODO: JMX will change to observe the model, at that point these names will disappear + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + private final ReplicatedEnvironmentConfiguration _configuration; + private final Durability _durability; + private final Boolean _coalescingSync; + private final String _prettyGroupNodeName; + private final File _environmentDirectory; + + private final ExecutorService _environmentJobExecutor; + private final ScheduledExecutorService _groupChangeExecutor; + private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); + private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); + private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); + + private volatile ReplicatedEnvironment _environment; + private volatile long _joinTime; + private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; + + public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) + { + _environmentDirectory = new File(configuration.getStorePath()); + if (!_environmentDirectory.exists()) + { + if (!_environmentDirectory.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + _configuration = configuration; + + _durability = Durability.parse(_configuration.getDurability()); + _coalescingSync = _configuration.isCoalescingSync(); + _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); + + // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread + _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); + _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); + _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS); // TODO make configurable + + // create environment in a separate thread to avoid renaming of the current thread by JE + _environment = createEnvironment(true); + } + + @Override + public void commit(final Transaction tx) + { + try + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + tx.commit(); + } + catch (DatabaseException de) + { + throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); + } + } + + @Override + public void close() + { + if (_state.compareAndSet(State.OPENING, State.CLOSING) || + _state.compareAndSet(State.OPEN, State.CLOSING) || + _state.compareAndSet(State.RESTARTING, State.CLOSING) ) + { + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); + } + + _environmentJobExecutor.shutdown(); + _groupChangeExecutor.shutdown(); + closeDatabases(); + closeEnvironment(); + } + finally + { + _state.compareAndSet(State.CLOSING, State.CLOSED); + } + } + } + + @Override + public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) + { + boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); + if (restart) + { + tryToRestartEnvironment(dbe); + } + return dbe; + } + + private void tryToRestartEnvironment(final DatabaseException dbe) + { + if (_state.compareAndSet(State.OPEN, State.RESTARTING)) + { + if (dbe != null && LOGGER.isDebugEnabled()) + { + LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); + } + + _environmentJobExecutor.execute(new Runnable() + { + @Override + public void run() + { + try + { + restartEnvironment(); + } + catch (Exception e) + { + LOGGER.error("Exception on environment restart", e); + } + } + }); + + } + else + { + LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); + } + } + + @Override + public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not in opened state"); + } + + if (!_environment.isValid()) + { + throw new IllegalStateException("Environment is not valid"); + } + + if (_environment.getState() != ReplicatedEnvironment.State.MASTER) + { + throw new IllegalStateException("Databases can only be opened on Master node"); + } + + for (String databaseName : databaseNames) + { + _databases.put(databaseName, new DatabaseHolder(dbConfig)); + } + for (String databaseName : databaseNames) + { + DatabaseHolder holder = _databases.get(databaseName); + openDatabaseInternally(databaseName, holder); + } + } + + private void openDatabaseInternally(String databaseName, DatabaseHolder holder) + { + Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); + holder.setDatabase(database); + } + + @Override + public Database getOpenDatabase(String name) + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not in opened state"); + } + + if (!_environment.isValid()) + { + throw new IllegalStateException("Environment is not valid"); + } + DatabaseHolder databaseHolder = _databases.get(name); + if (databaseHolder == null) + { + throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); + } + Database database = databaseHolder.getDatabase(); + if (database == null) + { + throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); + } + return database; + } + + @Override + public String getStoreLocation() + { + return _environmentDirectory.getAbsolutePath(); + } + + @Override + public void stateChange(final StateChangeEvent stateChangeEvent) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState()); + } + + if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) + { + _groupChangeExecutor.submit(new Runnable() + { + @Override + public void run() + { + stateChanged(stateChangeEvent); + } + }); + } + else + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName + + "' is in state " + _state.get()); + } + } + } + + private void stateChanged(StateChangeEvent stateChangeEvent) + { + ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) + { + if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) + { + LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); + _joinTime = System.currentTimeMillis(); + } + } + + if (state == ReplicatedEnvironment.State.MASTER) + { + reopenDatabases(); + } + + StateChangeListener listener = _stateChangeListener.get(); + if (listener != null) + { + listener.stateChange(stateChangeEvent); + } + + if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) + { + tryToRestartEnvironment(null); + } + _lastKnownEnvironmentState = state; + } + + private void reopenDatabases() + { + DatabaseConfig pingDbConfig = new DatabaseConfig(); + pingDbConfig.setTransactional(true); + pingDbConfig.setAllowCreate(true); + + _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); + + for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) + { + openDatabaseInternally(entry.getKey(), entry.getValue()); + } + } + + public String getGroupName() + { + return (String)_configuration.getGroupName(); + } + + public String getNodeName() + { + return _configuration.getName(); + } + + public String getHostPort() + { + return (String)_configuration.getHostPort(); + } + + public String getHelperHostPort() + { + return (String)_configuration.getHelperHostPort(); + } + + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() + { + return _coalescingSync; + } + + public String getNodeState() + { + if (_state.get() != State.OPEN) + { + return ReplicatedEnvironment.State.UNKNOWN.name(); + } + ReplicatedEnvironment.State state = _environment.getState(); + return state.toString(); + } + + public boolean isDesignatedPrimary() + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not opened"); + } + return _environment.getRepMutableConfig().getDesignatedPrimary(); + } + + public Future<Void> setDesignatedPrimary(final boolean isPrimary) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary); + } + + return _environmentJobExecutor.submit(new Callable<Void>() + { + @Override + public Void call() + { + setDesignatedPrimaryInternal(isPrimary); + return null; + } + }); + } + + void setDesignatedPrimaryInternal(final boolean isPrimary) + { + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary); + } + } + catch (Exception e) + { + LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e); + } + } + + int getPriority() + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not opened"); + } + ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); + return repConfig.getNodePriority(); + } + + public Future<Void> setPriority(final int priority) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority); + } + + return _environmentJobExecutor.submit(new Callable<Void>() + { + @Override + public Void call() + { + setPriorityInternal(priority); + return null; + } + }); + } + + void setPriorityInternal(int priority) + { + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority); + } + } + catch (Exception e) + { + LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e); + } + } + + int getElectableGroupSizeOverride() + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not opened"); + } + ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); + return repConfig.getElectableGroupSizeOverride(); + } + + public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride); + } + + return _environmentJobExecutor.submit(new Callable<Void>() + { + @Override + public Void call() + { + setElectableGroupSizeOverrideInternal(electableGroupOverride); + return null; + } + }); + } + + void setElectableGroupSizeOverrideInternal(int electableGroupOverride) + { + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride); + } + } + catch (Exception e) + { + LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e); + } + } + + + public long getJoinTime() + { + return _joinTime ; + } + + public long getLastKnownReplicationTransactionId() + { + if (_state.get() == State.OPEN) + { + VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange(); + VLSN lastTxnEnd = range.getLastTxnEnd(); + return lastTxnEnd.getSequence(); + } + else + { + return -1L; + } + } + + public List<Map<String, String>> getGroupMembers() + { + List<Map<String, String>> members = new ArrayList<Map<String, String>>(); + + for (ReplicationNode node : _environment.getGroup().getNodes()) + { + Map<String, String> nodeMap = new HashMap<String, String>(); + nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(final String nodeName) + { + createReplicationGroupAdmin().removeMember(nodeName); + } + + public void updateAddress(final String nodeName, final String newHostName, final int newPort) + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + helpers.addAll(_environment.getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = _environment.getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); + } + + + public ReplicatedEnvironment getEnvironment() + { + return _environment; + } + + public State getFacadeState() + { + return _state.get(); + } + + public void setStateChangeListener(StateChangeListener stateChangeListener) + { + if (_stateChangeListener.compareAndSet(null, stateChangeListener)) + { + _environment.setStateChangeListener(this); + } + else + { + throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName); + } + } + + private void closeEnvironment() + { + // Clean the log before closing. This makes sure it doesn't contain + // redundant data. Closing without doing this means the cleaner may not + // get a chance to finish. + try + { + if (_environment.isValid()) + { + _environment.cleanLog(); + } + } + finally + { + _environment.close(); + _environment = null; + } + } + + private void restartEnvironment() + { + LOGGER.info("Restarting environment"); + + closeEnvironmentSafely(); + + _environment = createEnvironment(false); + + if (_stateChangeListener.get() != null) + { + _environment.setStateChangeListener(this); + } + + LOGGER.info("Environment is restarted"); + } + + private void closeEnvironmentSafely() + { + Environment environment = _environment; + if (environment != null) + { + try + { + if (environment.isValid()) + { + try + { + closeDatabases(); + } + catch(Exception e) + { + LOGGER.warn("Ignoring an exception whilst closing databases", e); + } + } + environment.close(); + } + catch (EnvironmentFailureException efe) + { + LOGGER.warn("Ignoring an exception whilst closing environment", efe); + } + } + } + + private void closeDatabases() + { + RuntimeException firstThrownException = null; + for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) + { + DatabaseHolder databaseHolder = entry.getValue(); + Database database = databaseHolder.getDatabase(); + if (database != null) + { + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); + } + + database.close(); + } + catch(RuntimeException e) + { + LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); + if (firstThrownException == null) + { + firstThrownException = e; + } + } + finally + { + databaseHolder.setDatabase(null); + } + } + } + if (firstThrownException != null) + { + throw firstThrownException; + } + } + + private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) + { + String groupName = _configuration.getGroupName(); + String helperHostPort = _configuration.getHelperHostPort(); + String hostPort = _configuration.getHostPort(); + Map<String, String> environmentParameters = _configuration.getParameters(); + Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters(); + boolean designatedPrimary = _configuration.isDesignatedPrimary(); + int priority = _configuration.getPriority(); + int quorumOverride = _configuration.getQuorumOverride(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Creating environment"); + LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); + LOGGER.info("Group name " + groupName); + LOGGER.info("Node name " + _configuration.getName()); + LOGGER.info("Node host port " + hostPort); + LOGGER.info("Helper host port " + helperHostPort); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); + LOGGER.info("Node priority " + priority); + LOGGER.info("Quorum override " + quorumOverride); + } + + Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS); + if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty()) + { + replicationEnvironmentSettings.putAll(replicationEnvironmentParameters); + } + Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS); + if (environmentParameters != null && !environmentParameters.isEmpty()) + { + environmentSettings.putAll(environmentParameters); + } + + ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); + replicationConfig.setHelperHosts(helperHostPort); + replicationConfig.setDesignatedPrimary(designatedPrimary); + replicationConfig.setNodePriority(priority); + replicationConfig.setElectableGroupSizeOverride(quorumOverride); + + for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet()) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + envConfig.setDurability(_durability); + + for (Map.Entry<String, String> configItem : environmentSettings.entrySet()) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + + if (createEnvironmentInSeparateThread) + { + return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig); + } + else + { + return createEnvironment(_environmentDirectory, envConfig, replicationConfig); + } + } + + private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { + Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){ + @Override + public ReplicatedEnvironment call() throws Exception + { + String originalThreadName = Thread.currentThread().getName(); + try + { + return createEnvironment(environmentPathFile, envConfig, replicationConfig); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } + }}); + + long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); + try + { + return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Environment creation was interrupted", e); + } + catch (ExecutionException e) + { + throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); + } + catch (TimeoutException e) + { + throw new RuntimeException("JE environment has not been created in due time"); + } + } + + private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { + ReplicatedEnvironment environment = null; + try + { + environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Environment is created for node " + _prettyGroupNodeName); + } + return environment; + } + + @Override + public Committer createCommitter(String name) + { + if (_coalescingSync) + { + return new CoalescingCommiter(name, this); + } + else + { + return Committer.IMMEDIATE_FUTURE_COMMITTER; + } + } + + public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException + { + if (repNode == null) + { + throw new IllegalArgumentException("Node cannot be null"); + } + return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState(); + } + + // For testing only + int getNumberOfElectableGroupMembers() + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not opened"); + } + return _environment.getGroup().getElectableNodes().size(); + } + + private class RemoteNodeStateLearner implements Callable<Void> + { + private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap(); + @Override + public Void call() + { + final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>(); + try + { + Set<Future<Void>> futures = new HashSet<Future<Void>>(); + + for (final ReplicationNode node : _environment.getGroup().getElectableNodes()) + { + Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>() + { + @Override + public Void call() + { + DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL); + ReplicatedEnvironment.State nodeState; + try + { + nodeState = ping.getNodeState().getNodeState(); + } + catch (IOException e) + { + nodeState = ReplicatedEnvironment.State.UNKNOWN; + } + catch (ServiceConnectFailedException e) + { + nodeState = ReplicatedEnvironment.State.UNKNOWN; + } + + currentGroupState.put(node.getName(), nodeState); + return null; + } + }); + futures.add(future); + } + + for (Future<Void> future : futures) + { + try + { + future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); + } + catch (TimeoutException e) + { + LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); + future.cancel(true); + } + } + + if (ReplicatedEnvironment.State.MASTER == _environment.getState()) + { + boolean stateChanged = !_previousGroupState.equals(currentGroupState); + _previousGroupState = currentGroupState; + if (stateChanged && State.OPEN == _state.get()) + { + new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); + } + } + } + finally + { + _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + } + return null; + } + } + public static enum State + { + OPENING, + OPEN, + RESTARTING, + CLOSING, + CLOSED + } + + private static class DatabaseHolder + { + private final DatabaseConfig _config; + private Database _database; + + public DatabaseHolder(DatabaseConfig config) + { + _config = config; + } + + public Database getDatabase() + { + return _database; + } + + public void setDatabase(Database database) + { + _database = database; + } + + public DatabaseConfig getConfig() + { + return _config; + } + + @Override + public String toString() + { + return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; + } + + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java new file mode 100644 index 0000000000..cd53afe891 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Map; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; + +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; + +public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory +{ + + private static final int DEFAULT_NODE_PRIORITY = 1; + private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, + ReplicaAckPolicy.SIMPLE_MAJORITY); + private static final boolean DEFAULT_COALESCING_SYNC = true; + + + + @Override + public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore) + { + ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() + { + @Override + public boolean isDesignatedPrimary() + { + return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false); + } + + @Override + public boolean isCoalescingSync() + { + return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC); + } + + @Override + public String getStorePath() + { + return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + } + + @Override + public Map<String, String> getParameters() + { + return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig"); + } + + @Override + public Map<String, String> getReplicationParameters() + { + return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig"); + } + + @Override + public int getQuorumOverride() + { + return 0; + } + + @Override + public int getPriority() + { + return DEFAULT_NODE_PRIORITY; + } + + + + @Override + public String getName() + { + return (String)virtualHost.getAttribute("haNodeName"); + } + + @Override + public String getHostPort() + { + return (String)virtualHost.getAttribute("haNodeAddress"); + } + + @Override + public String getHelperHostPort() + { + return (String)virtualHost.getAttribute("haHelperAddress"); + } + + @Override + public String getGroupName() + { + return (String)virtualHost.getAttribute("haGroupName"); + } + + @Override + public String getDurability() + { + return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability"); + } + }; + return new ReplicatedEnvironmentFacade(configuration); + + } + + @Override + public String getType() + { + return ReplicatedEnvironmentFacade.TYPE; + } + + private boolean convertBoolean(final Object value, boolean defaultValue) + { + if(value instanceof Boolean) + { + return (Boolean) value; + } + else if(value instanceof String) + { + return Boolean.valueOf((String) value); + } + else if(value == null) + { + return defaultValue; + } + else + { + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Boolean"); + } + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 4d536a2f95..7852e2d703 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -21,11 +21,13 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import com.sleepycat.je.Cursor; + import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -38,6 +40,8 @@ import com.sleepycat.je.OperationStatus; public class Upgrader { + private static final Logger LOGGER = Logger.getLogger(Upgrader.class); + static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; @@ -63,7 +67,8 @@ public class Upgrader if(versionDb.count() == 0L) { - int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion(); + + int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion(); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(sourceVersion, key); DatabaseEntry value = new DatabaseEntry(); @@ -73,11 +78,17 @@ public class Upgrader } int version = getSourceVersion(versionDb); - if(version > AbstractBDBMessageStore.VERSION) + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Source message store version is " + version); + } + + if(version > BDBMessageStore.VERSION) { throw new StoreException("Database version " + version + " is higher than the most recent known version: " - + AbstractBDBMessageStore.VERSION); + + BDBMessageStore.VERSION); } performUpgradeFromVersion(version, versionDb); } @@ -124,8 +135,9 @@ public class Upgrader } void performUpgradeFromVersion(int sourceVersion, Database versionDb) + throws StoreException { - while(sourceVersion != AbstractBDBMessageStore.VERSION) + while(sourceVersion != BDBMessageStore.VERSION) { upgrade(sourceVersion, ++sourceVersion); DatabaseEntry key = new DatabaseEntry(); @@ -136,7 +148,7 @@ public class Upgrader } } - void upgrade(final int fromVersion, final int toVersion) + void upgrade(final int fromVersion, final int toVersion) throws StoreException { try { @@ -177,7 +189,7 @@ public class Upgrader private int identifyOldStoreVersion() throws DatabaseException { - int version = 0; + int version = BDBMessageStore.VERSION; for (String databaseName : _environment.getDatabaseNames()) { if (databaseName.contains("_v")) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java deleted file mode 100644 index c2b3aeab3e..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.net.InetAddress; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class BDBHAMessageStoreTest extends QpidTestCase -{ - private static final String TEST_LOG_FILE_MAX = "1000000"; - private static final String TEST_ELECTION_RETRIES = "1000"; - private static final String TEST_NUMBER_OF_THREADS = "10"; - private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; - private String _groupName; - private String _workDir; - private int _masterPort; - private String _host; - private XMLConfiguration _configXml; - private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _modelVhost; - - public void setUp() throws Exception - { - super.setUp(); - - _workDir = TMP_FOLDER + File.separator + getName(); - _host = InetAddress.getByName("localhost").getHostAddress(); - _groupName = "group" + getName(); - _masterPort = -1; - - FileUtils.delete(new File(_workDir), true); - _configXml = new XMLConfiguration(); - _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class); - - - BrokerTestHelper.setUp(); - } - - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - FileUtils.delete(new File(_workDir), true); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testSetSystemConfiguration() throws Exception - { - // create virtual host configuration, registry and host instance - addVirtualHostConfiguration(); - String vhostName = "test" + _masterPort; - VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock()); - - _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost); - BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore(); - - // test whether JVM system settings were applied - Environment env = store.getEnvironment(); - assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); - assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); - - ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); - assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, - repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); - assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, - repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); - } - - private void addVirtualHostConfiguration() throws Exception - { - int port = findFreePort(); - if (_masterPort == -1) - { - _masterPort = port; - } - String nodeName = getNodeNameForNodeAt(port); - - String vhostName = "test" + port; - String vhostPrefix = "virtualhosts.virtualhost." + vhostName; - - _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); - _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); - - when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator - + port); - when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName); - when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName); - when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port)); - when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort()); - - Map<String,String> bdbEnvConfig = new HashMap<String,String>(); - bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS); - bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX); - - when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig); - - Map<String,String> repConfig = new HashMap<String,String>(); - repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES); - repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT); - when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig); - - } - - private String getNodeNameForNodeAt(final int bdbPort) - { - return "node" + getName() + bdbPort; - } - - private String getNodeHostPortForNodeAt(final int bdbPort) - { - return _host + ":" + bdbPort; - } - - private String getHelperHostPort() - { - if (_masterPort == -1) - { - throw new IllegalStateException("Helper port not yet assigned."); - } - return _host + ":" + _masterPort; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 730001d849..385681446a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; - public void testMessageStoreCreator() { MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } - } -} + String type = new BDBMessageStoreFactory().getType(); + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); + }} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java new file mode 100644 index 0000000000..b19e18b204 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Collections; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Environment; + +public class StandardEnvironmentFacadeTest extends QpidTestCase +{ + protected File _storePath; + protected EnvironmentFacade _environmentFacade; + + protected void setUp() throws Exception + { + super.setUp(); + _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); + } + + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + if (_environmentFacade != null) + { + _environmentFacade.close(); + } + } + finally + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + } + + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage()); + } + } + + EnvironmentFacade getEnvironmentFacade() throws Exception + { + if (_environmentFacade == null) + { + _environmentFacade = createEnvironmentFacade(); + } + return _environmentFacade; + } + + EnvironmentFacade createEnvironmentFacade() + { + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java new file mode 100644 index 0000000000..a05a30b459 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.configuration.RecovererProvider; +import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + +public class VirtualHostTest extends QpidTestCase +{ + + private Broker _broker; + private StatisticsGatherer _statisticsGatherer; + private RecovererProvider _recovererProvider; + private File _configFile; + private File _bdbStorePath; + private VirtualHost _host; + private ConfigurationEntryStore _store; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _store = mock(ConfigurationEntryStore.class); + _broker = BrokerTestHelper.createBrokerMock(); + TaskExecutor taslExecutor = mock(TaskExecutor.class); + when(taslExecutor.isTaskExecutorThread()).thenReturn(true); + when(_broker.getTaskExecutor()).thenReturn(taslExecutor); + + + _statisticsGatherer = mock(StatisticsGatherer.class); + + _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis()); + _bdbStorePath.deleteOnExit(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + if (_host != null) + { + _host.setDesiredState(_host.getState(), State.STOPPED); + } + } + finally + { + if (_configFile != null) + { + _configFile.delete(); + } + if (_bdbStorePath != null) + { + FileUtils.delete(_bdbStorePath, true); + } + super.tearDown(); + } + } + + + public void testCreateBdbVirtualHostFromConfigurationFile() + { + String hostName = getName(); + long logFileMax = 2000000; + _host = createHostFromConfiguration(hostName, logFileMax); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig(); + assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + } + + public void testCreateBdbHaVirtualHostFromConfigurationFile() + { + String hostName = getName(); + + String repStreamTimeout = "2 h"; + String nodeName = "node"; + String groupName = "group"; + String nodeHostPort = "localhost:" + findFreePort(); + String helperHostPort = nodeHostPort; + String durability = "NO_SYNC,SYNC,NONE"; + _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); + ReplicationConfig repConfig = environment.getRepConfig(); + assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME)); + assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME)); + assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT)); + assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS)); + assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY)); + assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); + } + + private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children) + { + ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes, + children, _store); + + return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); + } + + private VirtualHost createHost(Map<String, Object> attributes) + { + return createHost(attributes, Collections.<UUID> emptySet()); + } + + private VirtualHost createHostFromConfiguration(String hostName, long logFileMax) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + + private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<highAvailability>" + + "<groupName>" + groupName + "</groupName>" + + "<nodeName>" + nodeName + "</nodeName>" + + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>" + + "<helperHostPort>" + helperHostPort + "</helperHostPort>" + + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>" + + "</highAvailability>" + + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + private Map<String, Object> writeConfigAndGenerateAttributes(String content) + { + _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath()); + return attributes; + } +} + +
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java new file mode 100644 index 0000000000..cd7dd69c46 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -0,0 +1,336 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +public class ReplicatedEnvironmentFacadeTest extends QpidTestCase +{ + + private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); + private static final int LISTENER_TIMEOUT = 5; + private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; + private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; + private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final boolean TEST_DESIGNATED_PRIMARY = false; + private static final boolean TEST_COALESCING_SYNC = true; + private static final int TEST_PRIORITY = 1; + private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; + + private File _storePath; + private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); + private VirtualHost _virtualHost = mock(VirtualHost.class); + + public void setUp() throws Exception + { + super.setUp(); + + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); + + _storePath = TestFileUtils.createTestDirectory("bdb", true); + + setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); + } + + @Override + public void tearDown() throws Exception + { + try + { + for (EnvironmentFacade ef : _nodes.values()) + { + ef.close(); + } + } + finally + { + try + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + finally + { + super.tearDown(); + } + } + } + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = createMaster(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = createMaster(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); + } + } + + public void testGetGroupName() throws Exception + { + assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); + } + + public void testGetNodeName() throws Exception + { + assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); + } + + public void testLastKnownReplicationTransactionId() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId(); + assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0); + } + + public void testGetNodeHostPort() throws Exception + { + assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); + } + + public void testGetHelperHostPort() throws Exception + { + assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); + } + + public void testGetDurability() throws Exception + { + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); + } + + public void testIsCoalescingSync() throws Exception + { + assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); + } + + public void testGetNodeState() throws Exception + { + assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); + } + + + public void testPriority() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); + Future<Void> future = facade.setPriority(TEST_PRIORITY + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); + } + + public void testDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + } + + public void testElectableGroupSizeOverride() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); + Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); + } + + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception + { + final CountDownLatch masterLatch = new CountDownLatch(1); + final AtomicInteger masterStateChangeCount = new AtomicInteger(); + final CountDownLatch unknownLatch = new CountDownLatch(1); + final AtomicInteger unknownStateChangeCount = new AtomicInteger(); + StateChangeListener stateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + if (stateChangeEvent.getState() == State.MASTER) + { + masterStateChangeCount.incrementAndGet(); + masterLatch.countDown(); + } + else if (stateChangeEvent.getState() == State.UNKNOWN) + { + unknownStateChangeCount.incrementAndGet(); + unknownLatch.countDown(); + } + } + }; + + addNode(State.MASTER, stateChangeListener); + assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + + ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + + // close replicas + replica1.close(); + replica2.close(); + + assertTrue("Environment should be recreated and go into unknown state", + unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); + assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + private ReplicatedEnvironmentFacade createMaster() throws Exception + { + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener); + assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + return env; + } + + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception + { + TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); + assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); + return replicaEnvironmentFacade; + } + + private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, + State desiredState, StateChangeListener stateChangeListener) + { + ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); + ref.setStateChangeListener(stateChangeListener); + _nodes.put(nodeName, ref); + return ref; + } + + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener); + } + + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) + { + ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); + when(node.getName()).thenReturn(nodeName); + when(node.getHostPort()).thenReturn(nodeHostPort); + when(node.isDesignatedPrimary()).thenReturn(designatedPrimary); + when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getPriority()).thenReturn(TEST_PRIORITY); + when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getDurability()).thenReturn(TEST_DURABILITY); + when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); + + Map<String, String> repConfig = new HashMap<String, String>(); + repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); + repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + when(node.getReplicationParameters()).thenReturn(repConfig); + when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); + return node; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java new file mode 100644 index 0000000000..0870191b35 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +class TestStateChangeListener implements StateChangeListener +{ + private final Set<State> _expectedStates; + private final CountDownLatch _latch; + private final AtomicReference<State> _currentActualState = new AtomicReference<State>(); + + public TestStateChangeListener(State expectedState) + { + this(Collections.singleton(expectedState)); + } + + public TestStateChangeListener(Set<State> expectedStates) + { + _expectedStates = new HashSet<State>(expectedStates); + _latch = new CountDownLatch(1); + } + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + _currentActualState.set(stateChangeEvent.getState()); + if (_expectedStates.contains(stateChangeEvent.getState())) + { + _latch.countDown(); + } + } + + public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException + { + return _latch.await(timeout, timeUnit); + } + + public State getCurrentActualState() + { + return _currentActualState.get(); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index 400ac12792..810f4a1fca 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -26,7 +26,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.OperationStatus; -import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase @@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase catch(ServerScopedRuntimeException ex) { assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " - + AbstractBDBMessageStore.VERSION, ex.getMessage()); + + BDBMessageStore.VERSION, ex.getMessage()); } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index bd0411619e..04817ad36c 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -20,11 +20,15 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.UUID; + import org.apache.qpid.server.store.StoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -41,6 +45,8 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.MessageStoreTest; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -73,7 +79,7 @@ public class BDBMessageStoreTest extends MessageStoreTest { MessageStore store = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(store); + BDBMessageStore bdbStore = assertBDBStore(store); // Create content ByteBuffers. // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. @@ -126,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTest /* * reload the store only (read-only) */ - AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore); + BDBMessageStore readOnlyStore = reloadStore(bdbStore); /* * Read back and validate the 0-8 message metadata and content @@ -225,14 +231,17 @@ public class BDBMessageStoreTest extends MessageStoreTest * Use this method instead of reloading the virtual host like other tests in order * to avoid the recovery handler deleting the message for not being on a queue. */ - private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception + private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception { messageStore.close(); - AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure(getVirtualHostModel(),true); + BDBMessageStore newStore = new BDBMessageStore(); + + MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); + when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); + newStore.configureMessageStore(getVirtualHostModel(), recoveryHandler, null); - newStore.startWithNoRecover(); + newStore.activate(); return newStore; } @@ -287,7 +296,7 @@ public class BDBMessageStoreTest extends MessageStoreTest public void testGetContentWithOffset() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(store); + BDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -347,7 +356,7 @@ public class BDBMessageStoreTest extends MessageStoreTest public void testMessageCreationAndRemoval() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(store); + BDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -372,12 +381,12 @@ public class BDBMessageStoreTest extends MessageStoreTest assertEquals("Retrieved content when none was expected", 0, bdbStore.getContent(messageid_0_8, 0, dst)); } - private AbstractBDBMessageStore assertBDBStore(MessageStore store) + private BDBMessageStore assertBDBStore(MessageStore store) { assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - return (AbstractBDBMessageStore) store; + return (BDBMessageStore) store; } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) @@ -410,7 +419,7 @@ public class BDBMessageStoreTest extends MessageStoreTest { MessageStore log = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(log); + BDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -460,7 +469,7 @@ public class BDBMessageStoreTest extends MessageStoreTest { MessageStore log = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(log); + BDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -506,7 +515,7 @@ public class BDBMessageStoreTest extends MessageStoreTest public void testOnDelete() throws Exception { MessageStore log = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(log); + BDBMessageStore bdbStore = assertBDBStore(log); String storeLocation = bdbStore.getStoreLocation(); File location = new File(storeLocation); @@ -529,7 +538,7 @@ public class BDBMessageStoreTest extends MessageStoreTest { MessageStore log = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(log); + BDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index 4b50121a7a..e8d18971ad 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -143,7 +144,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase CompositeData row = groupMembers.get(new Object[] {nodeName}); assertNotNull("Table does not contain row for node name " + nodeName, row); - assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT)); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java new file mode 100644 index 0000000000..4f1f830fd0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.concurrent.ThreadFactory; + +public final class DaemonThreadFactory implements ThreadFactory +{ + private String _threadName; + public DaemonThreadFactory(String threadName) + { + _threadName = threadName; + } + + @Override + public Thread newThread(Runnable r) + { + Thread thread = new Thread(r, _threadName); + thread.setDaemon(true); + return thread; + } +}
\ No newline at end of file diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 7a4f92f0ca..9fc95c1861 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -71,7 +72,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple _store = createStore(); ((DurableConfigurationStore)_store).configureConfigStore(vhost, null); - _store.configureMessageStore(vhost, mock(MessageStoreRecoveryHandler.class), null); + MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); + when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); + _store.configureMessageStore(vhost, recoveryHandler, null); + _store.activate(); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); @@ -89,7 +93,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple { if (_store != null) { - _store.close(); + // _store.close(); } FileUtils.delete(_storeLocation, true); } |
