summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-19 12:07:14 +0000
committerKeith Wall <kwall@apache.org>2014-06-19 12:07:14 +0000
commita2f02366378181d75ede5dd02246e0c690483329 (patch)
treebe39a4bc026f8f8ab2bb94735a1bf95c4c455236 /qpid/java
parent5ee6c9aec2fe07ed61fae0436b1956c454dd9f25 (diff)
downloadqpid-python-a2f02366378181d75ede5dd02246e0c690483329.tar.gz
QPID-5800: [Java Broker] Refactor BDB message store implementation to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with a BDB virtual host. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603849 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java1219
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java1239
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java176
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java64
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java3
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java4
7 files changed, 1507 insertions, 1200 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
new file mode 100644
index 0000000000..4072c483b2
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -0,0 +1,1219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
+
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+
+public abstract class AbstractBDBMessageStore implements MessageStore
+{
+
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
+ private static final String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+ private static final String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ";
+ private static final String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ private static final String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+
+ //TODO: Add upgrader to remove BRIDGES and LINKS
+ private static final String BRIDGEDB_NAME = "BRIDGES";
+ private static final String LINKDB_NAME = "LINKS";
+ private static final String XID_DB_NAME = "XIDS";
+
+ private final EventManager _eventManager = new EventManager();
+
+ private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(
+ Charset.forName("UTF-8")));
+
+ private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT.
+ setAllowCreate(true).
+ setInitialValue(1).
+ setWrap(true).
+ setCacheSize(100000);
+
+ private boolean _limitBusted;
+ private long _totalStoreSize;
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ try
+ {
+ new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e);
+ }
+
+ // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
+ _totalStoreSize = getSizeOnDisk();
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+
+ long newMessageId = getNextMessageSequenceNumber();
+
+ if (metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage<T>(newMessageId, metaData);
+ }
+ }
+
+ private long getNextMessageSequenceNumber()
+ {
+ long newMessageId;
+ try
+ {
+ // The implementations of sequences mean that there is only a transaction
+ // after every n sequence values, where n is the MESSAGE_METADATA_SEQ_CONFIG.getCacheSize()
+
+ Sequence mmdSeq = getEnvironmentFacade().openSequence(getMessageMetaDataSeqDb(),
+ MESSAGE_METADATA_SEQ_KEY,
+ MESSAGE_METADATA_SEQ_CONFIG);
+ newMessageId = mmdSeq.get(null, 1);
+ }
+ catch (DatabaseException de)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot get sequence value for new message", de);
+ }
+ return newMessageId;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public org.apache.qpid.server.store.Transaction newTransaction()
+ {
+ checkMessageStoreOpen();
+
+ return new BDBTransaction();
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, getEnvironmentFacade());
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ entries.add(entry);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+ }
+
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ + messageId + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+
+ try
+ {
+ OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Metadata not found for message with id " + messageId);
+ }
+
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
+
+ return mdd;
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error reading message metadata for message with id "
+ + messageId
+ + ": "
+ + e.getMessage(), e);
+ }
+ }
+
+ void removeMessage(long messageId, boolean sync) throws StoreException
+ {
+ boolean complete = false;
+ Transaction tx = null;
+
+ Random rand = null;
+ int attempts = 0;
+ try
+ {
+ do
+ {
+ tx = null;
+ try
+ {
+ tx = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Removing message id " + messageId);
+ }
+
+
+ OperationStatus status = getMessageMetaDataDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ getLogger().info(
+ "Message not found (attempt to remove failed - probably application initiated rollback) "
+ +
+ messageId);
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Deleted metadata for message " + messageId);
+ }
+
+ //now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ getMessageContentDb().delete(tx, contentKeyEntry);
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Deleted content for message " + messageId);
+ }
+
+ getEnvironmentFacade().commit(tx, sync);
+
+ complete = true;
+ tx = null;
+ }
+ catch (LockConflictException e)
+ {
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ getLogger().warn(
+ "Unable to abort transaction after LockConflictExcption on removal of message with id "
+ + messageId,
+ e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw getEnvironmentFacade().handleDatabaseException("Cannot remove message with id "
+ + messageId, e);
+ }
+
+
+ getLogger().warn("Lock timeout exception. Retrying (attempt "
+ + (attempts + 1) + " of " + LOCK_RETRY_ATTEMPTS + ") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
+ {
+ if(rand == null)
+ {
+ rand = new Random();
+ }
+
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
+
+ }
+ }
+ else
+ {
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw getEnvironmentFacade().handleDatabaseException("Cannot remove messages", e);
+ }
+ }
+ }
+ while(!complete);
+ }
+ catch (DatabaseException e)
+ {
+ getLogger().error("Unexpected BDB exception", e);
+
+ try
+ {
+ abortTransactionSafely(tx,
+ getEnvironmentFacade());
+ }
+ finally
+ {
+ tx = null;
+ }
+
+ throw getEnvironmentFacade().handleDatabaseException("Error removing message with id "
+ + messageId
+ + " from database: "
+ + e.getMessage(), e);
+ }
+ finally
+ {
+ try
+ {
+ abortTransactionSafely(tx,
+ getEnvironmentFacade());
+ }
+ finally
+ {
+ tx = null;
+ }
+ }
+ }
+
+
+ /**
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
+ *
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
+ *
+ * @return The number of bytes inserted into the destination
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
+ }
+
+ try
+ {
+
+ int written = 0;
+ OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
+ {
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+ return written;
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id "
+ + messageId
+ + " to database: "
+ + e.getMessage(), e);
+ }
+ }
+
+ private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getMessageMetaDataDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ if (!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot visit messages", e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param offset The offset of the data chunk in the message.
+ * @param contentBody The content of the data chunk.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void addContent(final Transaction tx, long messageId, int offset,
+ ByteBuffer contentBody) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
+ try
+ {
+ OperationStatus status = getMessageContentDb().put(tx, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error adding content for message id " + messageId + ": " + status);
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Storing content for message " + messageId + " in transaction " + tx);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error writing AMQMessage with id "
+ + messageId
+ + " to database: "
+ + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stores message meta-data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void storeMetaData(final Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
+ throws StoreException
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+ messageBinding.objectToEntry(messageMetaData, value);
+ try
+ {
+ getMessageMetaDataDb().put(tx, key, value);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error writing message metadata with id "
+ + messageId
+ + " to database: "
+ + e.getMessage(), e);
+ }
+ }
+
+
+ /**
+ * Places a message onto a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
+ */
+ private void enqueueMessage(final Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
+ keyBinding.objectToEntry(dd, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
+ try
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Enqueuing message " + messageId + " on queue "
+ + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
+ }
+ getDeliveryDb().put(tx, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+ throw getEnvironmentFacade().handleDatabaseException("Error writing enqueued message with id "
+ + messageId
+ + " for queue "
+ + queue.getName()
+ + " with id "
+ + queue.getId()
+ + " to database", e);
+ }
+ }
+
+ /**
+ * Extracts a message from a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void dequeueMessage(final Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
+ UUID id = queue.getId();
+ keyBinding.objectToEntry(queueEntryKey, key);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Dequeue message id " + messageId + " from queue "
+ + queue.getName() + " with id " + id);
+ }
+
+ try
+ {
+
+ OperationStatus status = getDeliveryDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find message with id " + messageId + " on queue "
+ + queue.getName() + " with id " + id);
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue"
+ + queue.getName() + " with id " + id);
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Removed message " + messageId + " on queue "
+ + queue.getName() + " with id " + id);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+
+ getLogger().error("Failed to dequeue message " + messageId + " in transaction " + tx, e);
+
+ throw getEnvironmentFacade().handleDatabaseException("Error accessing database while dequeuing message: "
+ + e.getMessage(), e);
+ }
+ }
+
+ private void recordXid(Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ org.apache.qpid.server.store.Transaction.Record[] enqueues,
+ org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+ keyBinding.objectToEntry(xid,key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ valueBinding.objectToEntry(preparedTransaction, value);
+
+ try
+ {
+ getXidDb().put(txn, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ getLogger().error("Failed to write xid: " + e.getMessage(), e);
+ throw getEnvironmentFacade().handleDatabaseException("Error writing xid to database", e);
+ }
+ }
+
+ private void removeXid(Transaction txn, long format, byte[] globalId, byte[] branchId)
+ throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+
+ keyBinding.objectToEntry(xid, key);
+
+
+ try
+ {
+
+ OperationStatus status = getXidDb().delete(txn, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find xid");
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove xid");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+
+ getLogger().error("Failed to remove xid in transaction " + txn, e);
+
+ throw getEnvironmentFacade().handleDatabaseException("Error accessing database while removing xid: "
+ + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commits all operations performed within a given transaction.
+ *
+ * @param tx The transaction to commit all operations for.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
+ */
+ private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ {
+ if (tx == null)
+ {
+ throw new StoreException("Fatal internal error: transactional is null at commitTran");
+ }
+
+ StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+
+ if (getLogger().isDebugEnabled())
+ {
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ getLogger().debug("commitTranImpl completed " + transactionType + " transaction " + tx);
+ }
+
+ return result;
+ }
+
+ /**
+ * Abandons all operations performed within a given transaction.
+ *
+ * @param tx The transaction to abandon.
+ *
+ * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
+ */
+ private void abortTran(final Transaction tx) throws StoreException
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("abortTran called for transaction " + tx);
+ }
+
+ try
+ {
+ tx.abort();
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
+ }
+ }
+
+ private void storedSizeChangeOccurred(final int delta) throws StoreException
+ {
+ try
+ {
+ storedSizeChange(delta);
+ }
+ catch(DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Stored size change exception", e);
+ }
+ }
+
+ private void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk()
+ {
+ getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (getEnvironmentFacade().getEnvironment().cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ getEnvironmentFacade().getEnvironment().checkpoint(force);
+ }
+
+
+ getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return getEnvironmentFacade().getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ private Database getMessageContentDb()
+ {
+ return getEnvironmentFacade().openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ }
+
+ private Database getMessageMetaDataDb()
+ {
+ return getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ }
+
+ private Database getMessageMetaDataSeqDb()
+ {
+ return getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ }
+
+ private Database getDeliveryDb()
+ {
+ return getEnvironmentFacade().openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ }
+
+ private Database getXidDb()
+ {
+ return getEnvironmentFacade().openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ }
+
+ protected abstract void checkMessageStoreOpen();
+
+ protected abstract ConfiguredObject<?> getParent();
+
+ protected abstract EnvironmentFacade getEnvironmentFacade();
+
+ protected abstract long getPersistentSizeLowThreshold();
+
+ protected abstract long getPersistentSizeHighThreshold();
+
+ protected abstract Logger getLogger();
+
+ private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private T _metaData;
+ private volatile SoftReference<T> _metaDataRef;
+
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+ StoredBDBMessage(long messageId, T metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+ StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<T>(metaData);
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ T metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ checkMessageStoreOpen();
+
+ metaData = (T) getMessageMetaData(_messageId);
+ _metaDataRef = new SoftReference<T>(metaData);
+ }
+
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData, 0, _data, 0, oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ checkMessageStoreOpen();
+
+ return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+ }
+ else
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
+ buf.position(0);
+ return buf;
+ }
+ }
+
+ synchronized void store(Transaction txn)
+ {
+ if (!stored())
+ {
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized StoreFuture flushToStore()
+ {
+ if(!stored())
+ {
+ checkMessageStoreOpen();
+
+ Transaction txn;
+ try
+ {
+ txn = getEnvironmentFacade().getEnvironment().beginTransaction(
+ null, null);
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
+ }
+ store(txn);
+ getEnvironmentFacade().commit(txn, true);
+
+ storedSizeChangeOccurred(getMetaData().getContentSize());
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ checkMessageStoreOpen();
+
+ int delta = getMetaData().getContentSize();
+ removeMessage(_messageId, false);
+ storedSizeChangeOccurred(-delta);
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass() + "[messageId=" + _messageId + "]";
+ }
+ }
+
+
+ private class BDBTransaction implements org.apache.qpid.server.store.Transaction
+ {
+ private Transaction _txn;
+ private int _storeSizeIncrease;
+
+ private BDBTransaction() throws StoreException
+ {
+ try
+ {
+ _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+ }
+ catch(DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot create store transaction", e);
+ }
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+
+ AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void commitTran() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+ return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ }
+
+ @Override
+ public void abortTran() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.abortTran(_txn);
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
+ Record[] dequeues) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 37792cdd43..e8555dd819 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -20,96 +20,62 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
+
import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Sequence;
-import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
-import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.util.FileUtils;
/**
- * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
- * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
- * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
- * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
+ * Implementation of a DurableConfigurationStore backed by BDB JE
+ * that also provides a MessageStore.
*/
public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
{
- public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true);
-
private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class);
public static final int VERSION = 8;
private static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static final String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
- private EnvironmentFacade _environmentFacade;
-
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private final EnvironmentFacadeFactory _environmentFacadeFactory;
+ private final ProvidedBDBMessageStore _providedMessageStore = new ProvidedBDBMessageStore();
+
+ private EnvironmentFacade _environmentFacade;
+
private String _storeLocation;
- private final BDBMessageStore _messageStore = new BDBMessageStore();
private ConfiguredObject<?> _parent;
public BDBConfigurationStore()
@@ -168,7 +134,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
}
-
}
private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
@@ -211,8 +176,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
finally
{
- closeCursorSafely(objectsCursor);
- closeCursorSafely(hierarchyCursor);
+ closeCursorSafely(objectsCursor, _environmentFacade);
+ closeCursorSafely(hierarchyCursor, _environmentFacade);
}
for (ConfiguredObjectRecord record : configuredObjects.values())
@@ -226,11 +191,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
public EnvironmentFacade getEnvironmentFacade()
{
return _environmentFacade;
@@ -241,7 +201,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
if (_configurationStoreOpen.compareAndSet(true, false))
{
- if (!_messageStore.isMessageStoreOpen())
+ if (!_providedMessageStore.isMessageStoreOpen())
{
closeEnvironment();
}
@@ -264,38 +224,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
}
- private void closeCursorSafely(Cursor cursor) throws StoreException
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
- }
- }
- }
-
- private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
- {
- try
- {
- if (tx != null)
- {
- tx.abort();
- }
- }
- catch (DatabaseException e1)
- {
- // We need the possible side effect of the handler restarting the environment but don't care about the exception
- _environmentFacade.handleDatabaseException(null, e1);
- LOGGER.warn(errorMessage, e1);
- }
- }
-
@Override
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
@@ -323,7 +251,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
if (txn != null)
{
- abortTransactionIgnoringException("Error creating configured object", txn);
+ abortTransactionSafely(txn, _environmentFacade);
}
}
}
@@ -359,7 +287,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
if (txn != null)
{
- abortTransactionIgnoringException("Error deleting configured objects", txn);
+ abortTransactionSafely(txn, _environmentFacade);
}
}
}
@@ -388,7 +316,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
if (txn != null)
{
- abortTransactionIgnoringException("Error updating configuration details within the store", txn);
+ abortTransactionSafely(txn, _environmentFacade);
}
}
}
@@ -512,7 +440,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
@Override
public MessageStore getMessageStore()
{
- return _messageStore;
+ return _providedMessageStore;
}
private void checkConfigurationStoreOpen()
@@ -523,9 +451,10 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
}
+ @Override
public void onDelete()
{
- if (!isConfigurationStoreOpen() && !_messageStore.isMessageStoreOpen())
+ if (!isConfigurationStoreOpen() && !_providedMessageStore.isMessageStoreOpen())
{
if (_storeLocation != null)
{
@@ -561,37 +490,12 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
return _environmentFacade.openDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
- class BDBMessageStore implements MessageStore
+ class ProvidedBDBMessageStore extends AbstractBDBMessageStore
{
- private static final int LOCK_RETRY_ATTEMPTS = 5;
-
- private static final String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
- private static final String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ";
- private static final String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
- private static final String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
-
- //TODO: Add upgrader to remove BRIDGES and LINKS
- private static final String BRIDGEDB_NAME = "BRIDGES";
- private static final String LINKDB_NAME = "LINKS";
- private static final String XID_DB_NAME = "XIDS";
-
- private final EventManager _eventManager = new EventManager();
-
private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
- private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(
- Charset.forName("UTF-8")));
-
- private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT.
- setAllowCreate(true).
- setInitialValue(1).
- setWrap(true).
- setCacheSize(100000);
-
- private boolean _limitBusted;
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
- private long _totalStoreSize;
private ConfiguredObject<?> _parent;
@@ -606,79 +510,19 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ overfullAttr instanceof Number
+ ? ((Number) overfullAttr).longValue()
+ : Long.parseLong(overfullAttr.toString());
_persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+ underfullAttr instanceof Number
+ ? ((Number) underfullAttr).longValue()
+ : Long.parseLong(underfullAttr.toString());
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
-
- if (_environmentFacade == null)
- {
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(parent, messageStoreSettings);
- _storeLocation = _environmentFacade.getStoreLocation();
- }
- }
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e);
- }
-
- // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
- _totalStoreSize = getSizeOnDisk();
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
-
- long newMessageId = getNextMessageSequenceNumber();
-
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
- }
-
- private long getNextMessageSequenceNumber()
- {
- long newMessageId;
- try
- {
- // The implementations of sequences mean that there is only a transaction
- // after every n sequence values, where n is the MESSAGE_METADATA_SEQ_CONFIG.getCacheSize()
-
- Sequence mmdSeq = _environmentFacade.openSequence(getMessageMetaDataSeqDb(),
- MESSAGE_METADATA_SEQ_KEY,
- MESSAGE_METADATA_SEQ_CONFIG);
- newMessageId = mmdSeq.get(null, 1);
}
- catch (DatabaseException de)
- {
- throw _environmentFacade.handleDatabaseException("Cannot get sequence value for new message", de);
- }
- return newMessageId;
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
}
public boolean isMessageStoreOpen()
@@ -687,673 +531,49 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
@Override
- public org.apache.qpid.server.store.Transaction newTransaction()
- {
- checkMessageStoreOpen();
-
- return new BDBTransaction();
- }
-
- @Override
public void closeMessageStore()
{
- if (_messageStoreOpen.compareAndSet(true, false) && !isConfigurationStoreOpen())
- {
- closeEnvironment();
- }
+ _messageStoreOpen.set(false);
}
@Override
- public void addEventListener(final EventListener eventListener, final Event... events)
+ public EnvironmentFacade getEnvironmentFacade()
{
- _eventManager.addEventListener(eventListener, events);
+ return _environmentFacade;
}
@Override
public void onDelete()
{
- BDBConfigurationStore.this.onDelete();
+ // Nothing to do, message store will be deleted when configuration store is deleted
}
@Override
public String getStoreLocation()
{
- return BDBConfigurationStore.this.getStoreLocation();
+ return BDBConfigurationStore.this._storeLocation;
}
@Override
- public void visitMessages(final MessageHandler handler) throws StoreException
+ protected long getPersistentSizeLowThreshold()
{
- checkMessageStoreOpen();
- visitMessagesInternal(handler, _environmentFacade);
+ return _persistentSizeLowThreshold;
}
@Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ protected long getPersistentSizeHighThreshold()
{
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- entries.add(entry);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
+ return _persistentSizeHighThreshold;
}
@Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
- {
- break;
- }
- }
-
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
-
- try
- {
- OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- void removeMessage(long messageId, boolean sync) throws StoreException
- {
- boolean complete = false;
- com.sleepycat.je.Transaction tx = null;
-
- Random rand = null;
- int attempts = 0;
- try
- {
- do
- {
- tx = null;
- try
- {
- tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = getMessageMetaDataDb().delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- getMessageContentDb().delete(tx, contentKeyEntry);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted content for message " + messageId);
- }
-
- _environmentFacade.commit(tx, sync);
-
- complete = true;
- tx = null;
- }
- catch (LockConflictException e)
- {
- try
- {
- if(tx != null)
- {
- tx.abort();
- }
- }
- catch(DatabaseException e2)
- {
- LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
- }
-
-
- LOGGER.warn("Lock timeout exception. Retrying (attempt "
- + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
- }
- }
- }
- while(!complete);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Unexpected BDB exception", e);
-
- try
- {
- abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
- }
- finally
- {
- tx = null;
- }
-
- throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- try
- {
- abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
- }
- finally
- {
- tx = null;
- }
- }
- }
-
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
-
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- try
- {
-
- int written = 0;
- OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
- if (offset > size)
- {
- throw new RuntimeException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
-
- dst.put(dataAsBytes, offset, written);
- }
- return written;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
- {
- Cursor cursor = null;
- try
- {
- cursor = getMessageMetaDataDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- if (!handler.handle(message))
- {
- break;
- }
- }
- }
- catch (DatabaseException e)
- {
- throw environmentFacade.handleDatabaseException("Cannot visit messages", e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch(DatabaseException e)
- {
- throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
- }
- }
- }
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
- * @param contentBody The content of the data chunk.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
- ByteBuffer contentBody) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
- try
- {
- OperationStatus status = getMessageContentDb().put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error adding content for message id " + messageId + ": " + status);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
-
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
- StorableMessageMetaData messageMetaData)
- throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("storeMetaData called for transaction " + tx
- + ", messageId " + messageId
- + ", messageMetaData " + messageMetaData);
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- getMessageMetaDataDb().put(tx, key, value);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
- }
- getDeliveryDb().put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
- throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
- + queue.getName() + " with id " + queue.getId() + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
- keyBinding.objectToEntry(queueEntryKey, key);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + queue.getName() + " with id " + id);
- }
-
- try
- {
-
- OperationStatus status = getDeliveryDb().delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + queue.getName() + " with id " + id);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + queue.getName() + " with id " + id);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removed message " + messageId + " on queue "
- + queue.getName() + " with id " + id);
-
- }
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
-
- throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
- keyBinding.objectToEntry(xid,key);
-
- DatabaseEntry value = new DatabaseEntry();
- PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- valueBinding.objectToEntry(preparedTransaction, value);
-
- try
- {
- getXidDb().put(txn, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to write xid: " + e.getMessage(), e);
- throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
- }
- }
-
- private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
- throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
-
- keyBinding.objectToEntry(xid, key);
-
-
- try
- {
-
- OperationStatus status = getXidDb().delete(txn, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find xid");
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove xid");
- }
-
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to remove xid in transaction " + txn, e);
-
- throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
- }
- }
-
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
- {
- if (tx == null)
- {
- throw new StoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- StoreFuture result = _environmentFacade.commit(tx, syncCommit);
-
- if (LOGGER.isDebugEnabled())
- {
- String transactionType = syncCommit ? "synchronous" : "asynchronous";
- LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws StoreException If the operation fails for any reason.
- */
- private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
+ protected ConfiguredObject<?> getParent()
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("abortTran called for transaction " + tx);
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
- }
+ return _parent;
}
- private void checkMessageStoreOpen()
+ @Override
+ protected void checkMessageStoreOpen()
{
if (!_messageStoreOpen.get())
{
@@ -1361,384 +581,11 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
}
- private void storedSizeChangeOccurred(final int delta) throws StoreException
- {
- try
- {
- storedSizeChange(delta);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
- }
- }
-
- private void storedSizeChange(final int delta)
- {
- if(getPersistentSizeHighThreshold() > 0)
- {
- synchronized (this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 2*delta;
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- _totalStoreSize = getSizeOnDisk();
-
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- _totalStoreSize = getSizeOnDisk();
-
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk();
-
- _totalStoreSize = getSizeOnDisk();
-
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- }
- }
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
- }
-
- private void reduceSizeOnDisk()
- {
- _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
- boolean cleaned = false;
- while (_environmentFacade.getEnvironment().cleanLog() > 0)
- {
- cleaned = true;
- }
- if (cleaned)
- {
- CheckpointConfig force = new CheckpointConfig();
- force.setForce(true);
- _environmentFacade.getEnvironment().checkpoint(force);
- }
-
-
- _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
- }
-
- private long getSizeOnDisk()
- {
- return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
- }
-
- private Database getMessageContentDb()
- {
- return _environmentFacade.openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG);
- }
-
- private Database getMessageMetaDataDb()
- {
- return _environmentFacade.openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG);
- }
-
- private Database getMessageMetaDataSeqDb()
- {
- return _environmentFacade.openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG);
- }
-
- private Database getDeliveryDb()
- {
- return _environmentFacade.openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG);
- }
-
- private Database getXidDb()
- {
- return _environmentFacade.openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG);
- }
-
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
- {
-
- private final long _messageId;
- private final boolean _isRecovered;
-
- private T _metaData;
- private volatile SoftReference<T> _metaDataRef;
-
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
-
- StoredBDBMessage(long messageId, T metaData)
- {
- this(messageId, metaData, false);
- }
-
- StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
- {
- _messageId = messageId;
- _isRecovered = isRecovered;
-
- if(!_isRecovered)
- {
- _metaData = metaData;
- }
- _metaDataRef = new SoftReference<T>(metaData);
- }
-
- @Override
- public T getMetaData()
- {
- T metaData = _metaDataRef.get();
- if(metaData == null)
- {
- checkMessageStoreOpen();
-
- metaData = (T) getMessageMetaData(_messageId);
- _metaDataRef = new SoftReference<T>(metaData);
- }
-
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- @Override
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData, 0, _data, 0, oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- @Override
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- checkMessageStoreOpen();
-
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.limit(length);
- buf.position(0);
- return buf;
- }
- }
-
- synchronized void store(com.sleepycat.je.Transaction txn)
- {
- if (!stored())
- {
- try
- {
- _dataRef = new SoftReference<byte[]>(_data);
- BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- BDBMessageStore.this.addContent(txn, _messageId, 0,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
- }
- }
-
- @Override
- public synchronized StoreFuture flushToStore()
- {
- if(!stored())
- {
- checkMessageStoreOpen();
-
- com.sleepycat.je.Transaction txn;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(
- null, null);
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
- }
- store(txn);
- _environmentFacade.commit(txn, true);
-
- storedSizeChangeOccurred(getMetaData().getContentSize());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- checkMessageStoreOpen();
-
- int delta = getMetaData().getContentSize();
- removeMessage(_messageId, false);
- storedSizeChangeOccurred(-delta);
- }
-
- private boolean stored()
- {
- return _metaData == null || _isRecovered;
- }
-
- @Override
- public String toString()
- {
- return this.getClass() + "[messageId=" + _messageId + "]";
- }
- }
-
-
- private class BDBTransaction implements org.apache.qpid.server.store.Transaction
+ @Override
+ protected Logger getLogger()
{
- private com.sleepycat.je.Transaction _txn;
- private int _storeSizeIncrease;
-
- private BDBTransaction() throws StoreException
- {
- try
- {
- _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
- }
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
- {
- checkMessageStoreOpen();
-
- if(message.getStoredMessage() instanceof StoredBDBMessage)
- {
- final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- }
-
- BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
- }
-
- @Override
- public void commitTran() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.commitTranImpl(_txn, true);
- BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- }
-
- @Override
- public StoreFuture commitTranAsync() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- return BDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- @Override
- public void abortTran() throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.abortTran(_txn);
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues) throws StoreException
- {
- checkMessageStoreOpen();
-
- BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
- }
+ return LOGGER;
}
-
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
new file mode 100644
index 0000000000..c3969090be
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.berkeleydb;
+
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.sleepycat.je.DatabaseException;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ * Implementation of a MessageStore backed by BDB JE.
+ */
+public class BDBMessageStore extends AbstractBDBMessageStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
+
+ private final EnvironmentFacadeFactory _environmentFacadeFactory;
+
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+
+ private EnvironmentFacade _environmentFacade;
+
+ private ConfiguredObject<?> _parent;
+ private String _storeLocation;
+
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
+ public BDBMessageStore()
+ {
+ this(new StandardEnvironmentFacadeFactory());
+ }
+
+ public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
+ {
+ _environmentFacadeFactory = environmentFacadeFactory;
+ }
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(parent, messageStoreSettings);
+ _storeLocation = _environmentFacade.getStoreLocation();
+ }
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ if (_environmentFacade != null)
+ {
+ try
+ {
+ _environmentFacade.close();
+ _environmentFacade = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw new StoreException("Exception occurred on message store close", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onDelete()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ if (_storeLocation != null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + _storeLocation);
+ }
+
+ File location = new File(_storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ LOGGER.error("Cannot delete " + _storeLocation);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ return _environmentFacade;
+ }
+
+ @Override
+ protected long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ @Override
+ protected long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected ConfiguredObject<?> getParent()
+ {
+ return _parent;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
new file mode 100644
index 0000000000..8504971a7a
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+import org.apache.qpid.server.store.StoreException;
+
+public class BDBUtils
+{
+ public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true);
+
+ public static void closeCursorSafely(Cursor cursor, final EnvironmentFacade environmentFacade) throws StoreException
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ // We need the possible side effect of the facade restarting the environment but don't care about the exception
+ throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+
+ public static void abortTransactionSafely(Transaction tx, EnvironmentFacade environmentFacade)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // We need the possible side effect of the facade restarting the environment but don't care about the exception
+ environmentFacade.handleDatabaseException("Cannot abort transaction", e);
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
index 2f1c9f9387..7c2dcc507d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@ManagedObject(category = false, type = BDBVirtualHost.VIRTUAL_HOST_TYPE)
@@ -45,6 +46,6 @@ public class BDBVirtualHost extends AbstractVirtualHost<BDBVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new BDBConfigurationStore().getMessageStore();
+ return new BDBMessageStore();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index f445171005..65fc0e9168 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -78,7 +78,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
@Override
protected MessageStore createStore() throws Exception
{
- MessageStore store = (new BDBConfigurationStore()).getMessageStore();
+ MessageStore store = new BDBMessageStore();
return store;
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index d623da846c..e40c6213cf 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -50,7 +50,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
@@ -431,7 +431,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return new BDBConfigurationStore().getMessageStore();
+ return new BDBMessageStore();
}
}