summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-06 15:43:08 +0000
committerKeith Wall <kwall@apache.org>2014-06-06 15:43:08 +0000
commit39249098b7b374c5e45d7139aa8b9df3aebad385 (patch)
treeab13b41b26d2036f5765e3a95b8692fe3903ce54 /qpid/java
parent53fd008b70676ce1382bec414bcd0d86299a4ced (diff)
downloadqpid-python-39249098b7b374c5e45d7139aa8b9df3aebad385.tar.gz
QPID-5800: [Java Broker} Refactor MessageStore implementations extracting a MessageStoreProvider interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java1802
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java1766
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java9
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java10
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java7
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java371
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java109
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java308
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java26
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java47
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java23
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java22
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java6
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java94
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java2
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java90
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java2
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java2
-rw-r--r--qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java29
-rw-r--r--qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java29
-rw-r--r--qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java4
-rw-r--r--qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java47
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java2
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java5
42 files changed, 2639 insertions, 2328 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
new file mode 100644
index 0000000000..2766c2ac98
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -0,0 +1,1802 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
+import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
+ * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
+ * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
+ * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
+ */
+public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class);
+
+ public static final int VERSION = 8;
+ private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
+ private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
+
+ private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+
+ //TODO: Add upgrader to remove BRIDGES and LINKS
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XID_DB_NAME = "XIDS";
+ private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME};
+ private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
+ private EnvironmentFacade _environmentFacade;
+ private final AtomicLong _messageId = new AtomicLong(0);
+
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+
+ private long _totalStoreSize;
+
+ private final EnvironmentFacadeFactory _environmentFacadeFactory;
+
+ private volatile Committer _committer;
+
+ private boolean _isMessageStoreProvider;
+
+ private String _storeLocation;
+ private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
+
+ public BDBConfigurationStore()
+ {
+ this(new StandardEnvironmentFacadeFactory());
+ }
+
+ public BDBConfigurationStore(EnvironmentFacadeFactory environmentFacadeFactory)
+ {
+ _environmentFacadeFactory = environmentFacadeFactory;
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ if (_environmentFacade == null)
+ {
+ EnvironmentFacadeTask[] initialisationTasks = null;
+ _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false);
+ if (_isMessageStoreProvider)
+ {
+ String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
+ System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask()};
+ }
+ else
+ {
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
+ }
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
+ _storeLocation = _environmentFacade.getStoreLocation();
+ }
+ else
+ {
+ throw new IllegalStateException("The database have been already opened as message store");
+ }
+ }
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ checkConfigurationStoreOpen();
+
+ try
+ {
+ handler.begin();
+ doVisitAllConfiguredObjectRecords(handler);
+ handler.end();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+ Cursor objectsCursor = null;
+ Cursor hierarchyCursor = null;
+ try
+ {
+ objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ BDBConfiguredObjectRecord configuredObject =
+ (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+ configuredObjects.put(configuredObject.getId(), configuredObject);
+ }
+
+ // set parents
+ hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+ while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+ if(child != null)
+ {
+ ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+ if(parent != null)
+ {
+ child.addParent(hk.getParentType(), parent);
+ }
+ }
+ }
+ }
+ finally
+ {
+ closeCursorSafely(objectsCursor);
+ closeCursorSafely(hierarchyCursor);
+ }
+
+ for (ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
+ }
+
+ }
+
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ return _environmentFacade;
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ if (!_messageStoreOpen.get())
+ {
+ closeEnvironment();
+ }
+ }
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environmentFacade != null)
+ {
+ try
+ {
+ _environmentFacade.close();
+ _environmentFacade = null;
+ }
+ catch(DatabaseException e)
+ {
+ throw new StoreException("Exception occurred on message store close", e);
+ }
+ }
+ }
+
+ private void closeCursorSafely(Cursor cursor) throws StoreException
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+
+ private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch (DatabaseException e1)
+ {
+ // We need the possible side effect of the handler restarting the environment but don't care about the exception
+ _environmentFacade.handleDatabaseException(null, e1);
+ LOGGER.warn(errorMessage, e1);
+ }
+ }
+
+ @Override
+ public void create(ConfiguredObjectRecord configuredObject) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Create " + configuredObject);
+ }
+
+ com.sleepycat.je.Transaction txn = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ storeConfiguredObjectEntry(txn, configuredObject);
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
+ + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error creating configured object", txn);
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+
+ com.sleepycat.je.Transaction txn = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ for(ConfiguredObjectRecord record : objects)
+ {
+ if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
+ {
+ removed.add(record.getId());
+ }
+ }
+
+ txn.commit();
+ txn = null;
+ return removed.toArray(new UUID[removed.size()]);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e);
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error deleting configured objects", txn);
+ }
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+
+ com.sleepycat.je.Transaction txn = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(createIfNecessary, record, txn);
+ }
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error updating configuration details within the store", txn);
+ }
+ }
+ }
+
+ private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record);
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(record.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ DatabaseEntry newValue = new DatabaseEntry();
+ ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
+
+ OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT);
+ final boolean isNewRecord = status == OperationStatus.NOTFOUND;
+ if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
+ {
+ // write the updated entry to the store
+ configuredObjectBinding.objectToEntry(record, newValue);
+ status = getConfiguredObjectsDb().put(txn, key, newValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error updating configuration details within the store: " + status);
+ }
+ if(isNewRecord)
+ {
+ writeHierarchyRecords(txn, record);
+ }
+ }
+ else if (status != OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Error finding configuration details within the store: " + status);
+ }
+ }
+
+
+ private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing configured object record: " + configuredObject);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(configuredObject.getId(), key);
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
+ }
+ writeHierarchyRecords(txn, configuredObject);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
+ {
+ OperationStatus status;
+ HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ DatabaseEntry hierarchyValue = new DatabaseEntry();
+
+ for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
+ {
+
+ hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
+ UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
+ status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
+ + status);
+ }
+ }
+ }
+
+ private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
+ {
+ UUID id = record.getId();
+ Map<String, ConfiguredObjectRecord> parents = record.getParents();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removing configured object: " + id);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(id, key);
+ OperationStatus status = getConfiguredObjectsDb().delete(tx, key);
+ if(status == OperationStatus.SUCCESS)
+ {
+ for(String parentType : parents.keySet())
+ {
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
+ keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
+ getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey);
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ public void onDelete()
+ {
+ if (!_configurationStoreOpen.get() && !_messageStoreOpen.get())
+ {
+ if (_storeLocation != null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + _storeLocation);
+ }
+
+ File location = new File(_storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ LOGGER.error("Cannot delete " + _storeLocation);
+ }
+ }
+ }
+ }
+ }
+
+ private Database getConfiguredObjectsDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
+ }
+
+ private Database getConfiguredObjectHierarchyDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME);
+ }
+
+ private Database getMessageContentDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
+ }
+
+ private Database getMessageMetaDataDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
+ }
+
+ private Database getDeliveryDb()
+ {
+ return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
+ }
+
+ private Database getXidDb()
+ {
+ return _environmentFacade.getOpenDatabase(XID_DB_NAME);
+ }
+
+ class UpgradeTask implements EnvironmentFacadeTask
+ {
+ private final ConfiguredObject<?> _parent;
+
+ public UpgradeTask(ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot upgrade store", e);
+ }
+ }
+ }
+
+ class OpenDatabasesTask implements EnvironmentFacadeTask
+ {
+ private String[] _names;
+
+ public OpenDatabasesTask(String[] names)
+ {
+ _names = names;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ facade.openDatabases(dbConfig, _names);
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot open databases", e);
+ }
+ }
+
+ }
+
+ class DiskSpaceTask implements EnvironmentFacadeTask
+ {
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
+ }
+ }
+
+ }
+
+ public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+ {
+ private long _maxId;
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ ((BDBMessageStore)getMessageStore()).visitMessagesInternal(this, facade);
+ _messageId.set(_maxId);
+ }
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_maxId<id)
+ {
+ _maxId = id;
+ }
+ return true;
+ }
+ }
+
+ class BDBMessageStore implements MessageStore
+ {
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
+ private final EventManager _eventManager = new EventManager();
+
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+
+ Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+ new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
+ _storeLocation = _environmentFacade.getStoreLocation();
+ }
+
+ _committer = _environmentFacade.createCommitter(parent.getName());
+ _committer.start();
+ }
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ if (metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage<T>(getNewMessageId(), metaData);
+ }
+ }
+
+ /**
+ * Return a valid, currently unused message id.
+ *
+ * @return A fresh message id.
+ */
+ private long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public org.apache.qpid.server.store.Transaction newTransaction()
+ {
+ checkMessageStoreOpen();
+
+ return new BDBTransaction();
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ if (_committer != null)
+ {
+ _committer.stop();
+ }
+ }
+ finally
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ closeEnvironment();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
+ @Override
+ public void onDelete()
+ {
+ BDBConfigurationStore.this.onDelete();
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return BDBConfigurationStore.this.getStoreLocation();
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, _environmentFacade);
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ entries.add(entry);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ + messageId + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+
+ try
+ {
+ OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Metadata not found for message with id " + messageId);
+ }
+
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
+
+ return mdd;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
+ }
+ }
+
+ void removeMessage(long messageId, boolean sync) throws StoreException
+ {
+ boolean complete = false;
+ com.sleepycat.je.Transaction tx = null;
+
+ Random rand = null;
+ int attempts = 0;
+ try
+ {
+ do
+ {
+ tx = null;
+ try
+ {
+ tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removing message id " + messageId);
+ }
+
+
+ OperationStatus status = getMessageMetaDataDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted metadata for message " + messageId);
+ }
+
+ //now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ getMessageContentDb().delete(tx, contentKeyEntry);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted content for message " + messageId);
+ }
+
+ _environmentFacade.commit(tx);
+ _committer.commit(tx, sync);
+
+ complete = true;
+ tx = null;
+ }
+ catch (LockConflictException e)
+ {
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
+ }
+
+
+ LOGGER.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
+ {
+ if(rand == null)
+ {
+ rand = new Random();
+ }
+
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
+
+ }
+ }
+ else
+ {
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
+ }
+ }
+ }
+ while(!complete);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Unexpected BDB exception", e);
+
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+
+ throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+ }
+ }
+
+
+ /**
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
+ *
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
+ *
+ * @return The number of bytes inserted into the destination
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
+ }
+
+ try
+ {
+
+ int written = 0;
+ OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
+ {
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+ return written;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ if (!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot visit messages", e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param offset The offset of the data chunk in the message.
+ * @param contentBody The content of the data chunk.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
+ ByteBuffer contentBody) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
+ try
+ {
+ OperationStatus status = getMessageContentDb().put(tx, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error adding content for message id " + messageId + ": " + status);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stores message meta-data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
+ throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+ messageBinding.objectToEntry(messageMetaData, value);
+ try
+ {
+ getMessageMetaDataDb().put(tx, key, value);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+
+ /**
+ * Places a message onto a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
+ keyBinding.objectToEntry(dd, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
+ }
+ getDeliveryDb().put(tx, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
+ + queue.getName() + " with id " + queue.getId() + " to database", e);
+ }
+ }
+
+ /**
+ * Extracts a message from a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
+ UUID id = queue.getId();
+ keyBinding.objectToEntry(queueEntryKey, key);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Dequeue message id " + messageId + " from queue "
+ + queue.getName() + " with id " + id);
+ }
+
+ try
+ {
+
+ OperationStatus status = getDeliveryDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find message with id " + messageId + " on queue "
+ + queue.getName() + " with id " + id);
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue"
+ + queue.getName() + " with id " + id);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removed message " + messageId + " on queue "
+ + queue.getName() + " with id " + id);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
+ }
+ }
+
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ org.apache.qpid.server.store.Transaction.Record[] enqueues,
+ org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+ keyBinding.objectToEntry(xid,key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ valueBinding.objectToEntry(preparedTransaction, value);
+
+ try
+ {
+ getXidDb().put(txn, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Failed to write xid: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
+ }
+ }
+
+ private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
+ throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+
+ keyBinding.objectToEntry(xid, key);
+
+
+ try
+ {
+
+ OperationStatus status = getXidDb().delete(txn, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find xid");
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove xid");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to remove xid in transaction " + txn, e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commits all operations performed within a given transaction.
+ *
+ * @param tx The transaction to commit all operations for.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
+ {
+ if (tx == null)
+ {
+ throw new StoreException("Fatal internal error: transactional is null at commitTran");
+ }
+
+ _environmentFacade.commit(tx);
+ StoreFuture result = _committer.commit(tx, syncCommit);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
+ }
+
+ return result;
+ }
+
+ /**
+ * Abandons all operations performed within a given transaction.
+ *
+ * @param tx The transaction to abandon.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("abortTran called for transaction " + tx);
+ }
+
+ try
+ {
+ tx.abort();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
+ }
+ }
+
+ private void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ private void storedSizeChangeOccurred(final int delta) throws StoreException
+ {
+ try
+ {
+ storedSizeChange(delta);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
+ }
+ }
+
+ private void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+ private void reduceSizeOnDisk()
+ {
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environmentFacade.getEnvironment().cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environmentFacade.getEnvironment().checkpoint(force);
+ }
+
+
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private T _metaData;
+ private volatile SoftReference<T> _metaDataRef;
+
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+ StoredBDBMessage(long messageId, T metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+ StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<T>(metaData);
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ T metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ checkMessageStoreOpen();
+
+ metaData = (T) getMessageMetaData(_messageId);
+ _metaDataRef = new SoftReference<T>(metaData);
+ }
+
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData, 0, _data, 0, oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ checkMessageStoreOpen();
+
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+ }
+ else
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
+ buf.position(0);
+ return buf;
+ }
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+ if (!stored())
+ {
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized StoreFuture flushToStore()
+ {
+ if(!stored())
+ {
+ checkMessageStoreOpen();
+
+ com.sleepycat.je.Transaction txn;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(
+ null, null);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
+ }
+ store(txn);
+ _environmentFacade.commit(txn);
+ _committer.commit(txn, true);
+
+ storedSizeChangeOccurred(getMetaData().getContentSize());
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ checkMessageStoreOpen();
+
+ int delta = getMetaData().getContentSize();
+ removeMessage(_messageId, false);
+ storedSizeChangeOccurred(-delta);
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass() + "[messageId=" + _messageId + "]";
+ }
+ }
+
+
+ private class BDBTransaction implements org.apache.qpid.server.store.Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
+
+ private BDBTransaction() throws StoreException
+ {
+ try
+ {
+ _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
+ }
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void commitTran() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.commitTranImpl(_txn, true);
+ BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
+ }
+
+ @Override
+ public void abortTran() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.abortTran(_txn);
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
+ Record[] dequeues) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
deleted file mode 100644
index 9fc90f9d7c..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ /dev/null
@@ -1,1766 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.Xid;
-import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
-import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
-import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-
-/**
- * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
- * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
- * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
- * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
- */
-public class BDBMessageStore implements MessageStore, DurableConfigurationStore
-{
- private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
-
- public static final int VERSION = 8;
- private static final int LOCK_RETRY_ATTEMPTS = 5;
- private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
- private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
-
- private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
- private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
- private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
-
- //TODO: Add upgrader to remove BRIDGES and LINKS
- private static String BRIDGEDB_NAME = "BRIDGES";
- private static String LINKDB_NAME = "LINKS";
- private static String XID_DB_NAME = "XIDS";
- private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME};
- private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
-
- private EnvironmentFacade _environmentFacade;
- private final AtomicLong _messageId = new AtomicLong(0);
-
- private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
- private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- private final EventManager _eventManager = new EventManager();
-
- private final EnvironmentFacadeFactory _environmentFacadeFactory;
-
- private volatile Committer _committer;
-
- private boolean _isMessageStoreProvider;
-
- private String _storeLocation;
-
- public BDBMessageStore()
- {
- this(new StandardEnvironmentFacadeFactory());
- }
-
- public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
- {
- _environmentFacadeFactory = environmentFacadeFactory;
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
- {
- if (_configurationStoreOpen.compareAndSet(false, true))
- {
- if (_environmentFacade == null)
- {
- EnvironmentFacadeTask[] initialisationTasks = null;
- _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false);
- if (_isMessageStoreProvider)
- {
- String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
- System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
- System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
- initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
- }
- else
- {
- initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
- }
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
- _storeLocation = _environmentFacade.getStoreLocation();
- }
- else
- {
- throw new IllegalStateException("The database have been already opened as message store");
- }
- }
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
- {
- checkConfigurationStoreOpen();
-
- try
- {
- handler.begin();
- doVisitAllConfiguredObjectRecords(handler);
- handler.end();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
- }
-
- }
-
- private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
- {
- Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
- Cursor objectsCursor = null;
- Cursor hierarchyCursor = null;
- try
- {
- objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
-
- while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- BDBConfiguredObjectRecord configuredObject =
- (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
- configuredObjects.put(configuredObject.getId(), configuredObject);
- }
-
- // set parents
- hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
- while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
- if(child != null)
- {
- ConfiguredObjectRecord parent = configuredObjects.get(parentId);
- if(parent != null)
- {
- child.addParent(hk.getParentType(), parent);
- }
- }
- }
- }
- finally
- {
- closeCursorSafely(objectsCursor);
- closeCursorSafely(hierarchyCursor);
- }
-
- for (ConfiguredObjectRecord record : configuredObjects.values())
- {
- boolean shoudlContinue = handler.handle(record);
- if (!shoudlContinue)
- {
- break;
- }
- }
-
- }
-
- @Override
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException
- {
- if (_messageStoreOpen.compareAndSet(false, true))
- {
-
- Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
- Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- if (_environmentFacade == null)
- {
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
- new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
- _storeLocation = _environmentFacade.getStoreLocation();
- }
-
- _committer = _environmentFacade.createCommitter(parent.getName());
- _committer.start();
- }
- }
-
- @Override
- public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
- {
- checkMessageStoreOpen();
-
- return new BDBTransaction();
- }
-
- @Override
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- public EnvironmentFacade getEnvironmentFacade()
- {
- return _environmentFacade;
- }
-
- @Override
- public void closeMessageStore() throws StoreException
- {
- if (_messageStoreOpen.compareAndSet(true, false))
- {
- try
- {
- if (_committer != null)
- {
- _committer.close();
- }
- }
- finally
- {
- if (!_configurationStoreOpen.get())
- {
- closeEnvironment();
- }
- }
- }
- }
-
- @Override
- public void closeConfigurationStore() throws StoreException
- {
- if (_configurationStoreOpen.compareAndSet(true, false))
- {
- if (!_messageStoreOpen.get())
- {
- closeEnvironment();
- }
- }
- }
-
- private void closeEnvironment()
- {
- if (_environmentFacade != null)
- {
- try
- {
- _environmentFacade.close();
- _environmentFacade = null;
- }
- catch(DatabaseException e)
- {
- throw new StoreException("Exception occured on message store close", e);
- }
- }
- }
-
- private void closeCursorSafely(Cursor cursor) throws StoreException
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
- }
- }
- }
-
-
- void removeMessage(long messageId, boolean sync) throws StoreException
- {
- boolean complete = false;
- com.sleepycat.je.Transaction tx = null;
-
- Random rand = null;
- int attempts = 0;
- try
- {
- do
- {
- tx = null;
- try
- {
- tx = _environmentFacade.beginTransaction();
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = getMessageMetaDataDb().delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- getMessageContentDb().delete(tx, contentKeyEntry);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted content for message " + messageId);
- }
-
- _environmentFacade.commit(tx);
- _committer.commit(tx, sync);
-
- complete = true;
- tx = null;
- }
- catch (LockConflictException e)
- {
- try
- {
- if(tx != null)
- {
- tx.abort();
- }
- }
- catch(DatabaseException e2)
- {
- LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
- }
-
-
- LOGGER.warn("Lock timeout exception. Retrying (attempt "
- + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
- }
- }
- }
- while(!complete);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Unexpected BDB exception", e);
-
- try
- {
- abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
- }
- finally
- {
- tx = null;
- }
-
- throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- try
- {
- abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
- }
- finally
- {
- tx = null;
- }
- }
- }
-
- private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
- {
- try
- {
- if (tx != null)
- {
- tx.abort();
- }
- }
- catch (DatabaseException e1)
- {
- // We need the possible side effect of the handler restarting the environment but don't care about the exception
- _environmentFacade.handleDatabaseException(null, e1);
- LOGGER.warn(errorMessage, e1);
- }
- }
-
- @Override
- public void create(ConfiguredObjectRecord configuredObject) throws StoreException
- {
- checkConfigurationStoreOpen();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Create " + configuredObject);
- }
-
- com.sleepycat.je.Transaction txn = null;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- storeConfiguredObjectEntry(txn, configuredObject);
- txn.commit();
- txn = null;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
- + " in database: " + e.getMessage(), e);
- }
- finally
- {
- if (txn != null)
- {
- abortTransactionIgnoringException("Error creating configured object", txn);
- }
- }
- }
-
- @Override
- public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
- {
- checkConfigurationStoreOpen();
-
- com.sleepycat.je.Transaction txn = null;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
-
- Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- for(ConfiguredObjectRecord record : objects)
- {
- if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
- {
- removed.add(record.getId());
- }
- }
-
- txn.commit();
- txn = null;
- return removed.toArray(new UUID[removed.size()]);
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e);
- }
- finally
- {
- if (txn != null)
- {
- abortTransactionIgnoringException("Error deleting configured objects", txn);
- }
- }
-
- }
-
- @Override
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- checkConfigurationStoreOpen();
-
- com.sleepycat.je.Transaction txn = null;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- for(ConfiguredObjectRecord record : records)
- {
- update(createIfNecessary, record, txn);
- }
- txn.commit();
- txn = null;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
- }
- finally
- {
- if (txn != null)
- {
- abortTransactionIgnoringException("Error updating configuration details within the store", txn);
- }
- }
-
- }
-
- private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record);
- }
-
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(record.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- DatabaseEntry newValue = new DatabaseEntry();
- ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
-
- OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT);
- final boolean isNewRecord = status == OperationStatus.NOTFOUND;
- if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
- {
- // write the updated entry to the store
- configuredObjectBinding.objectToEntry(record, newValue);
- status = getConfiguredObjectsDb().put(txn, key, newValue);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error updating configuration details within the store: " + status);
- }
- if(isNewRecord)
- {
- writeHierarchyRecords(txn, record);
- }
- }
- else if (status != OperationStatus.NOTFOUND)
- {
- throw new StoreException("Error finding configuration details within the store: " + status);
- }
- }
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
- }
- getDeliveryDb().put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
- throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
- + queue.getName() + " with id " + queue.getId() + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
- keyBinding.objectToEntry(queueEntryKey, key);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + queue.getName() + " with id " + id);
- }
-
- try
- {
-
- OperationStatus status = getDeliveryDb().delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + queue.getName() + " with id " + id);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + queue.getName() + " with id " + id);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removed message " + messageId + " on queue "
- + queue.getName() + " with id " + id);
-
- }
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
-
- throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
-
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
- keyBinding.objectToEntry(xid,key);
-
- DatabaseEntry value = new DatabaseEntry();
- PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- valueBinding.objectToEntry(preparedTransaction, value);
-
- try
- {
- getXidDb().put(txn, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to write xid: " + e.getMessage(), e);
- throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
- }
- }
-
- private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
- throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
-
- keyBinding.objectToEntry(xid, key);
-
-
- try
- {
-
- OperationStatus status = getXidDb().delete(txn, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find xid");
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove xid");
- }
-
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to remove xid in transaction " + txn, e);
-
- throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
- }
- }
-
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
- {
- if (tx == null)
- {
- throw new StoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- _environmentFacade.commit(tx);
- StoreFuture result = _committer.commit(tx, syncCommit);
-
- if (LOGGER.isDebugEnabled())
- {
- String transactionType = syncCommit ? "synchronous" : "asynchronous";
- LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("abortTran called for transaction " + tx);
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
- }
- }
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- private long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
- * @param contentBody The content of the data chunk.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
- ByteBuffer contentBody) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
- try
- {
- OperationStatus status = getMessageContentDb().put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error adding content for message id " + messageId + ": " + status);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
-
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
- StorableMessageMetaData messageMetaData)
- throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("storeMetaData called for transaction " + tx
- + ", messageId " + messageId
- + ", messageMetaData " + messageMetaData);
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- getMessageMetaDataDb().put(tx, key, value);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
-
- try
- {
- OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
-
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- try
- {
-
- int written = 0;
- OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
- if (offset > size)
- {
- throw new RuntimeException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
-
- dst.put(dataAsBytes, offset, written);
- }
- return written;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- if(metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNewMessageId(), metaData);
- }
- }
-
- private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing configured object record: " + configuredObject);
- }
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(configuredObject.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
-
- queueBinding.objectToEntry(configuredObject, value);
- try
- {
- OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " to database: "
- + status);
- }
- writeHierarchyRecords(txn, configuredObject);
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
- + " to database: " + e.getMessage(), e);
- }
- }
-
- private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
- {
- OperationStatus status;
- HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
- DatabaseEntry hierarchyKey = new DatabaseEntry();
- DatabaseEntry hierarchyValue = new DatabaseEntry();
-
- for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
- {
-
- hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
- UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
- status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
- + status);
- }
- }
- }
-
- private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
- {
- UUID id = record.getId();
- Map<String, ConfiguredObjectRecord> parents = record.getParents();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removing configured object: " + id);
- }
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(id, key);
- OperationStatus status = getConfiguredObjectsDb().delete(tx, key);
- if(status == OperationStatus.SUCCESS)
- {
- for(String parentType : parents.keySet())
- {
- DatabaseEntry hierarchyKey = new DatabaseEntry();
- HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
- keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
- getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey);
- }
- }
- return status;
- }
-
- private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
- {
-
- private final long _messageId;
- private final boolean _isRecovered;
-
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, false);
- }
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
- {
- _messageId = messageId;
- _isRecovered = isRecovered;
-
- if(!_isRecovered)
- {
- _metaData = metaData;
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- @Override
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaDataRef.get();
- if(metaData == null)
- {
- checkMessageStoreOpen();
-
- metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- @Override
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- @Override
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- checkMessageStoreOpen();
-
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.limit(length);
- buf.position(0);
- return buf;
- }
- }
-
- synchronized void store(com.sleepycat.je.Transaction txn)
- {
- if (!stored())
- {
- try
- {
- _dataRef = new SoftReference<byte[]>(_data);
- BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- BDBMessageStore.this.addContent(txn, _messageId, 0,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
- }
- }
-
- @Override
- public synchronized StoreFuture flushToStore()
- {
- if(!stored())
- {
- checkMessageStoreOpen();
-
- com.sleepycat.je.Transaction txn;
- try
- {
- txn = _environmentFacade.beginTransaction();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
- }
- store(txn);
- _environmentFacade.commit(txn);
- _committer.commit(txn, true);
-
- storedSizeChangeOccured(getMetaData().getContentSize());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- checkMessageStoreOpen();
-
- int delta = getMetaData().getContentSize();
- BDBMessageStore.this.removeMessage(_messageId, false);
- storedSizeChangeOccured(-delta);
- }
-
- private boolean stored()
- {
- return _metaData == null || _isRecovered;
- }
- }
-
- private class BDBTransaction implements org.apache.qpid.server.store.Transaction
- {
- private com.sleepycat.je.Transaction _txn;
- private int _storeSizeIncrease;
-
- private BDBTransaction() throws StoreException
- {
- _txn = _environmentFacade.beginTransaction();
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
- {
- checkMessageStoreOpen();
-
- if(message.getStoredMessage() instanceof StoredBDBMessage)
- {
- final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- }
-
- BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
- }
-
- @Override
- public void commitTran() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.commitTranImpl(_txn, true);
- BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
- }
-
- @Override
- public StoreFuture commitTranAsync() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
- return BDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- @Override
- public void abortTran() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.abortTran(_txn);
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
- }
- }
-
- private void storedSizeChangeOccured(final int delta) throws StoreException
- {
- try
- {
- storedSizeChange(delta);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
- }
- }
-
- private void storedSizeChange(final int delta)
- {
- if(getPersistentSizeHighThreshold() > 0)
- {
- synchronized (this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 2*delta;
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- _totalStoreSize = getSizeOnDisk();
-
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- _totalStoreSize = getSizeOnDisk();
-
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk();
-
- _totalStoreSize = getSizeOnDisk();
-
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- }
- }
-
- private void checkConfigurationStoreOpen()
- {
- if (!_configurationStoreOpen.get())
- {
- throw new IllegalStateException("Configuration store is not open");
- }
- }
-
- private void checkMessageStoreOpen()
- {
- if (!_messageStoreOpen.get())
- {
- throw new IllegalStateException("Message store is not open");
- }
- }
-
- private void reduceSizeOnDisk()
- {
- _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
- boolean cleaned = false;
- while (_environmentFacade.getEnvironment().cleanLog() > 0)
- {
- cleaned = true;
- }
- if (cleaned)
- {
- CheckpointConfig force = new CheckpointConfig();
- force.setForce(true);
- _environmentFacade.getEnvironment().checkpoint(force);
- }
-
-
- _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
- }
-
- private long getSizeOnDisk()
- {
- return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
- }
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
- }
-
-
- @Override
- public void onDelete()
- {
- if (!_configurationStoreOpen.get() && !_messageStoreOpen.get())
- {
- if (_storeLocation != null)
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting store " + _storeLocation);
- }
-
- File location = new File(_storeLocation);
- if (location.exists())
- {
- if (!FileUtils.delete(location, true))
- {
- LOGGER.error("Cannot delete " + _storeLocation);
- }
- }
- }
- }
- }
-
- private Database getConfiguredObjectsDb()
- {
- return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
- }
-
- private Database getConfiguredObjectHierarchyDb()
- {
- return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME);
- }
-
- private Database getMessageContentDb()
- {
- return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
- }
-
- private Database getMessageMetaDataDb()
- {
- return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
- }
-
- private Database getDeliveryDb()
- {
- return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
- }
-
- private Database getXidDb()
- {
- return _environmentFacade.getOpenDatabase(XID_DB_NAME);
- }
-
- class UpgradeTask implements EnvironmentFacadeTask
- {
- private final ConfiguredObject<?> _parent;
-
- public UpgradeTask(ConfiguredObject<?> parent)
- {
- _parent = parent;
- }
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot upgrade store", e);
- }
- }
- }
-
- class OpenDatabasesTask implements EnvironmentFacadeTask
- {
- private String[] _names;
-
- public OpenDatabasesTask(String[] names)
- {
- _names = names;
- }
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- facade.openDatabases(dbConfig, _names);
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot open databases", e);
- }
- }
-
- }
-
- class DiskSpaceTask implements EnvironmentFacadeTask
- {
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
- }
- }
-
- }
-
- public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
- {
- private long _maxId;
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- visitMessagesInternal(this, facade);
- _messageId.set(_maxId);
- }
-
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- long id = storedMessage.getMessageNumber();
- if (_maxId<id)
- {
- _maxId = id;
- }
- return true;
- }
-
- }
-
- @Override
- public void visitMessages(MessageHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
- visitMessagesInternal(handler, _environmentFacade);
- }
-
- private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
- {
- Cursor cursor = null;
- try
- {
- cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- if (!handler.handle(message))
- {
- break;
- }
- }
- }
- catch (DatabaseException e)
- {
- throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch(DatabaseException e)
- {
- throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
- }
- }
- }
- }
-
- @Override
- public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- entries.add(entry);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
- }
-
- @Override
- public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
- {
- break;
- }
- }
-
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 2d783fa181..067372a228 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -42,13 +42,13 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public DurableConfigurationStore createDurableConfigurationStore()
{
- return new BDBMessageStore();
+ return new BDBConfigurationStore();
}
@Override
public MessageStore createMessageStore()
{
- return new BDBMessageStore();
+ return (new BDBConfigurationStore()).getMessageStore();
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index e80d60609f..1be7e97a47 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -27,9 +27,8 @@ import java.lang.reflect.InvocationTargetException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -70,7 +69,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ int sourceVersion = isEmpty ? BDBConfigurationStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -86,11 +85,11 @@ public class Upgrader
LOGGER.debug("Source message store version is " + version);
}
- if(version > BDBMessageStore.VERSION)
+ if(version > BDBConfigurationStore.VERSION)
{
throw new StoreException("Database version " + version
+ " is higher than the most recent known version: "
- + BDBMessageStore.VERSION);
+ + BDBConfigurationStore.VERSION);
}
performUpgradeFromVersion(version, versionDb);
}
@@ -139,7 +138,7 @@ public class Upgrader
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
throws StoreException
{
- while(sourceVersion != BDBMessageStore.VERSION)
+ while(sourceVersion != BDBConfigurationStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
@@ -191,7 +190,7 @@ public class Upgrader
private int identifyOldStoreVersion() throws DatabaseException
{
- int version = BDBMessageStore.VERSION;
+ int version = BDBConfigurationStore.VERSION;
for (String databaseName : _environment.getDatabaseNames())
{
if (databaseName.contains("_v"))
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index 974ff51a3b..af82d9fd2d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -42,7 +42,7 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
{
public static final String TYPE = "BDB_HA";
- private final BDBMessageStore _messageStore;
+ private final BDBConfigurationStore _configurationStore;
private MessageStoreLogSubject _messageStoreLogSubject;
@ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment")
@@ -56,8 +56,8 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
{
super(attributes, virtualHostNode);
- _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
- _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
+ _configurationStore = (BDBConfigurationStore) virtualHostNode.getConfigurationStore();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _configurationStore.getMessageStore().getClass().getSimpleName());
}
@Override
@@ -68,13 +68,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
@Override
public DurableConfigurationStore getDurableConfigurationStore()
{
- return _messageStore;
+ return _configurationStore;
}
@Override
public MessageStore getMessageStore()
{
- return _messageStore;
+ return _configurationStore.getMessageStore();
}
@Override
@@ -176,7 +176,7 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
{
- return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade();
+ return (ReplicatedEnvironmentFacade) _configurationStore.getEnvironmentFacade();
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 91ac63ff46..e36def581b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -42,7 +42,6 @@ import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
-
import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -59,7 +58,7 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener;
@@ -265,9 +264,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- public BDBMessageStore getConfigurationStore()
+ public BDBConfigurationStore getConfigurationStore()
{
- return (BDBMessageStore) super.getConfigurationStore();
+ return (BDBConfigurationStore) super.getConfigurationStore();
}
protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
@@ -278,7 +277,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
+ return new BDBConfigurationStore(new ReplicatedEnvironmentFacadeFactory());
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 962e19f81c..0254cbc909 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -165,8 +165,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
DurableConfigurationStore store = node.getConfigurationStore();
assertNotNull(store);
- BDBMessageStore bdbMessageStore = (BDBMessageStore) store;
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment();
+ BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store;
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
ReplicationConfig replicationConfig = environment.getRepConfig();
assertEquals(nodeName, environment.getNodeName());
@@ -181,7 +181,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host child was not added", virtualHost);
assertEquals("Unexpected virtual host name", groupName, virtualHost.getName());
- assertEquals("Unexpected virtual host store", store, virtualHost.getMessageStore());
+ assertEquals("Unexpected virtual host store", bdbConfigurationStore.getMessageStore(), virtualHost.getMessageStore());
assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState());
node.stop();
@@ -212,8 +212,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
BDBHAVirtualHostNode<?> node = createAndStartHaVHN(attributes);
- BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore();
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment();
+ BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority());
assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary());
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
index e1678e6f65..8657a3a0b1 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -28,6 +28,6 @@ public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurati
@Override
protected DurableConfigurationStore createConfigStore() throws Exception
{
- return new BDBMessageStore();
+ return new BDBConfigurationStore();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index f2de01445d..2afdaa4dd5 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -74,7 +74,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
@Override
protected MessageStore createStore() throws Exception
{
- MessageStore store = new BDBMessageStore();
+ MessageStore store = (new BDBConfigurationStore()).getMessageStore();
return store;
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 6fba1b215e..d623da846c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -50,6 +50,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore.BDBMessageStore;
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
@@ -81,7 +82,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
*/
public void testBDBMessagePersistence() throws Exception
{
- BDBMessageStore bdbStore = (BDBMessageStore)getStore();
+ MessageStore bdbStore = getStore();
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -342,7 +343,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
*/
public void testMessageCreationAndRemoval() throws Exception
{
- BDBMessageStore bdbStore = (BDBMessageStore)getStore();
+ BDBMessageStore bdbStore = (BDBMessageStore) getStore();
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -430,7 +431,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return new BDBMessageStore();
+ return new BDBConfigurationStore().getMessageStore();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
index c407be50c6..5b869b67dc 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -26,7 +26,8 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
@@ -94,7 +95,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
catch(ServerScopedRuntimeException ex)
{
assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + BDBMessageStore.VERSION, ex.getMessage());
+ + BDBConfigurationStore.VERSION, ex.getMessage());
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index 4b9a8d19a8..54dd08fd98 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -24,7 +24,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
@@ -95,7 +95,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
{
assertEquals("Unexpected store version", -1, getStoreVersion(_environment));
_upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(_environment));
+ assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(_environment));
assertContent();
}
@@ -115,7 +115,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
List<String> expectedDatabases = new ArrayList<String>();
expectedDatabases.add(Upgrader.VERSION_DB_NAME);
assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(emptyEnvironment));
+ assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(emptyEnvironment));
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e0c1f77d2b..1db2c89d05 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -46,15 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -67,7 +58,17 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.module.SimpleModule;
-abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
@@ -369,7 +370,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
if (_messageStoreOpen.compareAndSet(false, true))
@@ -952,7 +952,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
public void closeMessageStore()
{
if (_messageStoreOpen.compareAndSet(true, false))
@@ -978,7 +977,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract void doClose();
- @Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
checkMessageStoreOpen();
@@ -1132,7 +1130,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- @Override
public Transaction newTransaction()
{
checkMessageStoreOpen();
@@ -1665,7 +1662,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
public boolean isPersistent()
{
return true;
@@ -1975,7 +1971,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
@@ -2250,7 +2245,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2301,7 +2295,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2346,7 +2339,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2447,8 +2439,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract void storedSizeChange(int storeSizeIncrease);
-
- @Override
public void onDelete()
{
// TODO should probably check we are closed
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index e2c43f5012..e69de29bb2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -1,371 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-
-/** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
-{
- private final class MemoryMessageStoreTransaction implements Transaction
- {
- private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
- private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
-
- private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
- private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
-
- @Override
- public StoreFuture commitTranAsync()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _localEnqueueMap.put(queue.getId(), messageIds);
- }
- messageIds.add(message.getMessageNumber());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- Set<Long> messageIds = _localDequeueMap.get(queue.getId());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _localDequeueMap.put(queue.getId(), messageIds);
- }
- messageIds.add(message.getMessageNumber());
- }
-
- @Override
- public void commitTran()
- {
- commitTransactionInternal(this);
- _localEnqueueMap.clear();
- _localDequeueMap.clear();
- }
-
- @Override
- public void abortTran()
- {
- _localEnqueueMap.clear();
- _localDequeueMap.clear();
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
- }
- }
-
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
-
- protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
-
- private Object _transactionLock = new Object();
- private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
- private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
-
- @SuppressWarnings("unchecked")
- @Override
- public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
- {
- long id = _messageId.getAndIncrement();
-
- if(metaData.isPersistent())
- {
- return new StoredMemoryMessage(id, metaData)
- {
-
- @Override
- public StoreFuture flushToStore()
- {
- _messages.putIfAbsent(getMessageNumber(), this) ;
- return super.flushToStore();
- }
-
- @Override
- public void remove()
- {
- _messages.remove(getMessageNumber());
- super.remove();
- }
-
- };
- }
- else
- {
- return new StoredMemoryMessage(id, metaData);
- }
- }
-
- private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
- {
- synchronized (_transactionLock )
- {
- for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
- {
- Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
- }
- messageIds.addAll(loacalEnqueuedEntry.getValue());
- }
-
- for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
- {
- Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
- if (messageIds != null)
- {
- messageIds.removeAll(loacalDequeueEntry.getValue());
- if (messageIds.isEmpty())
- {
- _messageInstances.remove(loacalDequeueEntry.getKey());
- }
- }
- }
-
- for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
- {
- _distributedTransactions.put(entry.getKey(), entry.getValue());
- }
-
- for (Xid removed : transaction._localDistributedTransactionsRemoves)
- {
- _distributedTransactions.remove(removed);
- }
-
- }
-
-
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new MemoryMessageStoreTransaction();
- }
-
- @Override
- public boolean isPersistent()
- {
- return false;
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- }
-
- @Override
- public void create(ConfiguredObjectRecord record)
- {
- if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
- {
- throw new StoreException("Record with id " + record.getId() + " is already present");
- }
- }
-
- @Override
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
- {
- for (ConfiguredObjectRecord record : records)
- {
- ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
- if (previousValue == null && !createIfNecessary)
- {
- throw new StoreException("Record with id " + record.getId() + " does not exist");
- }
- }
- }
-
- @Override
- public UUID[] remove(final ConfiguredObjectRecord... objects)
- {
- List<UUID> removed = new ArrayList<UUID>();
- for (ConfiguredObjectRecord record : objects)
- {
- if (_configuredObjectRecords.remove(record.getId()) != null)
- {
- removed.add(record.getId());
- }
- }
- return removed.toArray(new UUID[removed.size()]);
- }
-
- @Override
- public void closeConfigurationStore()
- {
- _configuredObjectRecords.clear();
- }
-
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
- {
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
- {
- handler.begin();
- for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
- {
- if (!handler.handle(record))
- {
- break;
- }
- }
- handler.end();
- }
-
- @Override
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- }
-
- @Override
- public void closeMessageStore()
- {
- _messages.clear();
- synchronized (_transactionLock)
- {
- _messageInstances.clear();
- _distributedTransactions.clear();
- }
- }
-
- @Override
- public String getStoreLocation()
- {
- return null;
- }
-
- @Override
- public void onDelete()
- {
- }
-
- @Override
- public void visitMessages(MessageHandler handler) throws StoreException
- {
- for (StoredMemoryMessage message : _messages.values())
- {
- if(!handler.handle(message))
- {
- break;
- }
- }
- }
-
- @Override
- public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
- {
- for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
- {
- UUID resourceId = enqueuedEntry.getKey();
- for (Long messageId : enqueuedEntry.getValue())
- {
- if (!handler.handle(resourceId, messageId))
- {
- return;
- }
- }
- }
- }
- }
-
- @Override
- public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
- {
- for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
- {
- Xid xid = entry.getKey();
- DistributedTransactionRecords records = entry.getValue();
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
- {
- break;
- }
- }
- }
- }
-
- private static final class DistributedTransactionRecords
- {
- private Record[] _enqueues;
- private Record[] _dequeues;
-
- public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
- {
- super();
- _enqueues = enqueues;
- _dequeues = dequeues;
- }
-
- public Record[] getEnqueues()
- {
- return _enqueues;
- }
-
- public Record[] getDequeues()
- {
- return _dequeues;
- }
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
new file mode 100644
index 0000000000..267e1d9cb3
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+abstract class AbstractMemoryStore implements DurableConfigurationStore, MessageStoreProvider
+{
+ private final MessageStore _messageStore = new MemoryMessageStore();
+
+
+ private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
+
+
+
+ @Override
+ public void create(ConfiguredObjectRecord record)
+ {
+ if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+ {
+ throw new StoreException("Record with id " + record.getId() + " is already present");
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+ if (previousValue == null && !createIfNecessary)
+ {
+ throw new StoreException("Record with id " + record.getId() + " does not exist");
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
+ {
+ List<UUID> removed = new ArrayList<UUID>();
+ for (ConfiguredObjectRecord record : objects)
+ {
+ if (_configuredObjectRecords.remove(record.getId()) != null)
+ {
+ removed.add(record.getId());
+ }
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configuredObjectRecords.clear();
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin();
+ for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+ {
+ if (!handler.handle(record))
+ {
+ break;
+ }
+ }
+ handler.end();
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
new file mode 100644
index 0000000000..44d640ca86
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore implements MessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ private final ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+ private final Object _transactionLock = new Object();
+ private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+ private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+
+ private final class MemoryMessageStoreTransaction implements Transaction
+ {
+ private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+ private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+ private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+ private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
+ @Override
+ public StoreFuture commitTranAsync()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localEnqueueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localDequeueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
+ }
+
+ @Override
+ public void commitTran()
+ {
+ commitTransactionInternal(this);
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
+ }
+
+ @Override
+ public void abortTran()
+ {
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
+ }
+ }
+
+ private static final class DistributedTransactionRecords
+ {
+ private Transaction.Record[] _enqueues;
+ private Transaction.Record[] _dequeues;
+
+ public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ {
+ super();
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public Transaction.Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public Transaction.Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
+
+ private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
+ {
+ synchronized (_transactionLock )
+ {
+ for (Map.Entry<UUID, Set<Long>> localEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(localEnqueuedEntry.getKey());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _messageInstances.put(localEnqueuedEntry.getKey(), messageIds);
+ }
+ messageIds.addAll(localEnqueuedEntry.getValue());
+ }
+
+ for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+ if (messageIds != null)
+ {
+ messageIds.removeAll(loacalDequeueEntry.getValue());
+ if (messageIds.isEmpty())
+ {
+ _messageInstances.remove(loacalDequeueEntry.getKey());
+ }
+ }
+ }
+
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+ {
+ _distributedTransactions.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Xid removed : transaction._localDistributedTransactionsRemoves)
+ {
+ _distributedTransactions.remove(removed);
+ }
+
+ }
+
+
+ }
+
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ long id = _messageId.getAndIncrement();
+
+ if(metaData.isPersistent())
+ {
+ return new StoredMemoryMessage<T>(id, metaData)
+ {
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ _messages.putIfAbsent(getMessageNumber(), this) ;
+ return super.flushToStore();
+ }
+
+ @Override
+ public void remove()
+ {
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
+
+ };
+ }
+ else
+ {
+ return new StoredMemoryMessage<T>(id, metaData);
+ }
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new MemoryMessageStoreTransaction();
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ _messages.clear();
+ synchronized (_transactionLock)
+ {
+ _messageInstances.clear();
+ _distributedTransactions.clear();
+ }
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return null;
+ }
+
+ @Override
+ public void onDelete()
+ {
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ for (StoredMemoryMessage message : _messages.values())
+ {
+ if(!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+ {
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
+ {
+ if (!handler.handle(resourceId, messageId))
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(xid.getFormat(),
+ xid.getGlobalId(),
+ xid.getBranchId(),
+ records.getEnqueues(),
+ records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java
new file mode 100644
index 0000000000..94d58013d2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface MessageStoreProvider
+{
+ MessageStore getMessageStore();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e7302270bb..f4e1376980 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -23,13 +23,13 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
-public class StoredMemoryMessage implements StoredMessage
+public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageNumber;
private ByteBuffer _content;
- private final StorableMessageMetaData _metaData;
+ private final T _metaData;
- public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
+ public StoredMemoryMessage(long messageNumber, T metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
@@ -128,7 +128,7 @@ public class StoredMemoryMessage implements StoredMessage
}
- public StorableMessageMetaData getMetaData()
+ public T getMetaData()
{
return _metaData;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index d27cd1c13e..9e10d5e424 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -77,6 +77,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -196,13 +197,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
DurableConfigurationStore durableConfigurationStore = _virtualHostNode.getConfigurationStore();
+ // TODO attribute messageStoreProvider is to be removed
boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider();
if (nodeIsMessageStoreProvider)
{
- if (!(durableConfigurationStore instanceof MessageStore))
+ if (!(durableConfigurationStore instanceof MessageStoreProvider))
{
throw new IllegalConfigurationException("Virtual host node " + _virtualHostNode.getName()
- + " is configured as a provider of message store but the MessageStore interface is not implemented on a configuration store of type "
+ + " is configured as a provider of message store but the MessageStoreProvider interface is not implemented on a configuration store of type "
+ durableConfigurationStore.getClass().getName());
}
}
@@ -215,7 +217,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
+ ". You can either configure the message store setting on the host or "
+ (durableConfigurationStore instanceof MessageStore ?
" configure VirtualHostNode " + _virtualHostNode.getName() + " as a provider of message store" :
- " change the node type to one having configuration store implementing the MessageStore inteface") );
+ " change the node type to one having configuration store implementing the MessageStore interface") );
}
String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
if (storeType == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 62e545659b..b9356b368b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
@ManagedObject( category = false, type = "STANDARD")
public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost>
@@ -96,7 +97,7 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost
VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
if (virtualHostNode.isMessageStoreProvider())
{
- _messageStore = (MessageStore)virtualHostNode.getConfigurationStore();
+ _messageStore = ((MessageStoreProvider)virtualHostNode.getConfigurationStore()).getMessageStore();
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index f52558e298..c87d24f9c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X>
{
@@ -198,9 +199,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
deleteVirtualHostIfExists();
close();
deleted();
- if (getConfigurationStore() instanceof MessageStore)
+ if (getConfigurationStore() instanceof MessageStoreProvider)
{
- ((MessageStore)getConfigurationStore()).onDelete();
+ ((MessageStoreProvider)getConfigurationStore()).getMessageStore().onDelete();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java
new file mode 100644
index 0000000000..4cb85dafd4
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.MessageStore;
+
+@ManagedObject( category = false )
+public interface MessageStoreProvidingVirtualHostNode<X extends MessageStoreProvidingVirtualHostNode<X>>
+ extends VirtualHostNode<X>
+{
+ MessageStore getProvidedMessageStore();
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
new file mode 100644
index 0000000000..8fd3cbb1fe
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class MemoryMessageStoreTest extends MessageStoreTestCase
+{
+
+ @Override
+ protected Map<String, Object> getStoreSettings() throws Exception
+ {
+ return Collections.<String, Object>emptyMap();
+ }
+
+ @Override
+ protected MessageStore createMessageStore()
+ {
+ return new MemoryMessageStore();
+ }
+
+ @Override
+ protected void reopenStore() throws Exception
+ {
+ // cannot re-open memory message store as it is not persistent
+ }
+
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index bfa4e1d52e..12b21fa964 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.server.store.handler.MessageHandler;
-/** A simple message store that stores the messages in a thread-safe structure in memory. */
-public class TestMemoryMessageStore extends AbstractMemoryMessageStore
+/**
+ * A simple message store that stores the messages in a thread-safe structure in memory.
+ */
+public class TestMemoryMessageStore extends MemoryMessageStore
{
public static final String TYPE = "TestMemory";
@@ -34,15 +36,14 @@ public class TestMemoryMessageStore extends AbstractMemoryMessageStore
{
final AtomicInteger counter = new AtomicInteger();
visitMessages(new MessageHandler()
- {
-
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- counter.incrementAndGet();
- return true;
- }
- });
+ {
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ counter.incrementAndGet();
+ return true;
+ }
+ });
return counter.get();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
index d7c11ea226..df17884495 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
@@ -49,7 +49,9 @@ public class TestMemoryMessageStoreFactory implements MessageStoreFactory, Durab
@Override
public DurableConfigurationStore createDurableConfigurationStore()
{
- return new TestMemoryMessageStore();
+ return new AbstractMemoryStore()
+ {
+ };
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index e1e37ad3bd..47456f2675 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -21,6 +21,10 @@
package org.apache.qpid.server.protocol.v0_8;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Set;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -39,10 +43,6 @@ import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Set;
-
/**
* Tests that acknowledgements are handled correctly.
*/
@@ -90,8 +90,6 @@ public class AckTest extends QpidTestCase
{
for (int i = 1; i <= count; i++)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
MessagePublishInfo publishBody = new MessagePublishInfo()
{
@@ -130,13 +128,15 @@ public class AckTest extends QpidTestCase
b.setDeliveryMode((byte) 2);
}
- // we increment the reference here since we are not delivering the messaging to any queues, which is where
- // the reference is normally incremented. The test is easier to construct if we have direct access to the
- // subscription
+ // The test is easier to construct if we have direct access to the subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
- MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
- final StoredMessage storedMessage = _messageStore.addMessage(mmd);
+
+ final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
+
+ final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd);
+
+ final StoredMessage storedMessage = result;
final AMQMessage message = new AMQMessage(storedMessage);
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
txn.enqueue(_queue, message,
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index e9c37e7b42..69b3069ddb 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -81,7 +81,8 @@ public class ReferenceCountingTest extends QpidTestCase
- MessageMetaData mmd = new MessageMetaData(info, chb);
+ final MessageMetaData mmd = new MessageMetaData(info, chb);
+
StoredMessage storedMessage = _store.addMessage(mmd);
storedMessage.flushToStore();
@@ -139,7 +140,8 @@ public class ReferenceCountingTest extends QpidTestCase
final ContentHeaderBody chb = createPersistentContentHeader();
- MessageMetaData mmd = new MessageMetaData(info, chb);
+ final MessageMetaData mmd = new MessageMetaData(info, chb);
+
StoredMessage storedMessage = _store.addMessage(mmd);
storedMessage.flushToStore();
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index d682076350..38b4c66ebe 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -35,11 +35,21 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.util.FileUtils;
/**
@@ -47,7 +57,8 @@ import org.apache.qpid.util.FileUtils;
* mechanism.
*
*/
-public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
+public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider,
+ DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -72,6 +83,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
private String _storeLocation;
private Class<Driver> _driverClass;
+ private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
+
public DerbyMessageStore()
{
}
@@ -239,8 +252,6 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
}
-
- @Override
public String getStoreLocation()
{
return _storeLocation;
@@ -446,4 +457,81 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
{
return DriverManager.getConnection(_connectionURL);
}
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private class MessageStoreWrapper implements MessageStore
+ {
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings);
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ return DerbyMessageStore.this.addMessage(metaData);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return DerbyMessageStore.this.isPersistent();
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return DerbyMessageStore.this.newTransaction();
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ DerbyMessageStore.this.closeMessageStore();
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ DerbyMessageStore.this.addEventListener(eventListener, events);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return DerbyMessageStore.this.getStoreLocation();
+ }
+
+ @Override
+ public void onDelete()
+ {
+ DerbyMessageStore.this.onDelete();
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ DerbyMessageStore.this.visitMessages(handler);
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ DerbyMessageStore.this.visitMessageInstances(handler);
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ DerbyMessageStore.this.visitDistributedTransactions(handler);
+ }
+ }
+
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
index 13c897135d..9bc3780a71 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -48,7 +48,7 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableCon
@Override
public MessageStore createMessageStore()
{
- return new DerbyMessageStore();
+ return (new DerbyMessageStore()).getMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index ba7ae26292..1d35b9ef83 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
@Override
protected MessageStore createStore() throws Exception
{
- return new DerbyMessageStore();
+ return (new DerbyMessageStore()).getMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 9a2d945494..4594b7f223 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return new DerbyMessageStore();
+ return (new DerbyMessageStore()).getMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index d70f2a3d78..ddafa83bb3 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -31,12 +31,22 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.util.MapValueConverter;
/**
@@ -44,7 +54,7 @@ import org.apache.qpid.server.util.MapValueConverter;
* mechanism.
*
*/
-public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore
+public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider
{
private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
@@ -60,6 +70,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
protected String _connectionURL;
private ConnectionProvider _connectionProvider;
+ private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
private static class JDBCDetails
{
@@ -331,7 +342,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
{
}
- @Override
public String getStoreLocation()
{
return _connectionURL;
@@ -428,4 +438,80 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
}
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private class MessageStoreWrapper implements MessageStore
+ {
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings);
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ return JDBCMessageStore.this.addMessage(metaData);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return JDBCMessageStore.this.isPersistent();
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return JDBCMessageStore.this.newTransaction();
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ JDBCMessageStore.this.closeMessageStore();
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ JDBCMessageStore.this.addEventListener(eventListener, events);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return JDBCMessageStore.this.getStoreLocation();
+ }
+
+ @Override
+ public void onDelete()
+ {
+ JDBCMessageStore.this.onDelete();
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ JDBCMessageStore.this.visitMessages(handler);
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ JDBCMessageStore.this.visitMessageInstances(handler);
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ JDBCMessageStore.this.visitDistributedTransactions(handler);
+ }
+ }
+
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
index e1db859a98..ab7ac6c671 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
@@ -42,7 +42,7 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConf
@Override
public MessageStore createMessageStore()
{
- return new JDBCMessageStore();
+ return (new JDBCMessageStore()).getMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 2322fa7102..1f03b7f75f 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return new JDBCMessageStore();
+ return (new JDBCMessageStore()).getMessageStore();
}
private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException
diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java
new file mode 100644
index 0000000000..0b0e6705d5
--- /dev/null
+++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryConfigurationStore extends AbstractMemoryStore
+{
+ public static final String TYPE = "Memory";
+
+}
diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index c8dd2e6e61..e69de29bb2 100644
--- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-
-/** A simple message store that stores the messages in a thread-safe structure in memory. */
-public class MemoryMessageStore extends AbstractMemoryMessageStore
-{
- public static final String TYPE = "Memory";
-
-}
diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
index d5d5969a47..8148ff9371 100644
--- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
@@ -33,7 +33,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo
@Override
public String getType()
{
- return MemoryMessageStore.TYPE;
+ return MemoryConfigurationStore.TYPE;
}
@Override
@@ -50,7 +50,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo
@Override
public DurableConfigurationStore createDurableConfigurationStore()
{
- return new MemoryMessageStore();
+ return new MemoryConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index 8fd3cbb1fe..e69de29bb2 100644
--- a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
+++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.util.Collections;
-import java.util.Map;
-
-public class MemoryMessageStoreTest extends MessageStoreTestCase
-{
-
- @Override
- protected Map<String, Object> getStoreSettings() throws Exception
- {
- return Collections.<String, Object>emptyMap();
- }
-
- @Override
- protected MessageStore createMessageStore()
- {
- return new MemoryMessageStore();
- }
-
- @Override
- protected void reopenStore() throws Exception
- {
- // cannot re-open memory message store as it is not persistent
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 0993783e54..b6dd1b1b71 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -27,12 +27,12 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.model.ConfiguredObject;
-public class QuotaMessageStore extends AbstractMemoryMessageStore
+public class QuotaMessageStore extends MemoryMessageStore
{
public static final String TYPE = "QuotaMessageStore";
private final AtomicLong _messageId = new AtomicLong(1);
- private long _totalStoreSize;;
+ private long _totalStoreSize;
private boolean _limitBusted;
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
@@ -66,12 +66,11 @@ public class QuotaMessageStore extends AbstractMemoryMessageStore
}
- @SuppressWarnings("unchecked")
@Override
- public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData)
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
final long id = _messageId.getAndIncrement();
- return new StoredMemoryMessage(id, metaData);
+ return new StoredMemoryMessage<T>(id, metaData);
}
@Override
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index ba00c430ed..0a686926e3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -146,7 +146,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
_defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
}
- final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE);
+ final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryConfigurationStore.TYPE : messageStoreSettings.get(REAL_STORE);
final String realStore = (String) realStoreAttr;
_realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 1aee32db1b..e9f33a1658 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -67,7 +67,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MemoryConfigurationStore;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.SystemUtils;
@@ -854,7 +854,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
storeDir = ":memory:";
}
- else if (!MemoryMessageStore.TYPE.equals(storeType))
+ else if (!MemoryConfigurationStore.TYPE.equals(storeType))
{
storeDir = "${QPID_WORK}" + File.separator + virtualHostNodeName + File.separator + brokerPort;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java
index e4d4e5b24e..0dcfbf2be3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java
@@ -32,14 +32,13 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.JsonFileConfigStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MemoryConfigurationStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.Strings;
@@ -84,7 +83,7 @@ public class TestUtils
config.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, false);
// If using MMS, switch to split store with JSON config store.
- if (MemoryMessageStore.TYPE.equals(configStoreType))
+ if (MemoryConfigurationStore.TYPE.equals(configStoreType))
{
configStoreType = JsonFileConfigStore.TYPE;
}