diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-19 12:07:14 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-19 12:07:14 +0000 |
| commit | a2f02366378181d75ede5dd02246e0c690483329 (patch) | |
| tree | be39a4bc026f8f8ab2bb94735a1bf95c4c455236 /qpid/java | |
| parent | 5ee6c9aec2fe07ed61fae0436b1956c454dd9f25 (diff) | |
| download | qpid-python-a2f02366378181d75ede5dd02246e0c690483329.tar.gz | |
QPID-5800: [Java Broker] Refactor BDB message store implementation to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with
a BDB virtual host.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603849 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 1507 insertions, 1200 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java new file mode 100644 index 0000000000..4072c483b2 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -0,0 +1,1219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; + +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; +import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Xid; +import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; + + +public abstract class AbstractBDBMessageStore implements MessageStore +{ + + private static final int LOCK_RETRY_ATTEMPTS = 5; + + private static final String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; + private static final String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ"; + private static final String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; + private static final String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + + //TODO: Add upgrader to remove BRIDGES and LINKS + private static final String BRIDGEDB_NAME = "BRIDGES"; + private static final String LINKDB_NAME = "LINKS"; + private static final String XID_DB_NAME = "XIDS"; + + private final EventManager _eventManager = new EventManager(); + + private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes( + Charset.forName("UTF-8"))); + + private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. + setAllowCreate(true). + setInitialValue(1). + setWrap(true). + setCacheSize(100000); + + private boolean _limitBusted; + private long _totalStoreSize; + + @Override + public void upgradeStoreStructure() throws StoreException + { + try + { + new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary(); + } + catch(DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e); + } + + // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service. + _totalStoreSize = getSizeOnDisk(); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + { + + long newMessageId = getNextMessageSequenceNumber(); + + if (metaData.isPersistent()) + { + return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); + } + else + { + return new StoredMemoryMessage<T>(newMessageId, metaData); + } + } + + private long getNextMessageSequenceNumber() + { + long newMessageId; + try + { + // The implementations of sequences mean that there is only a transaction + // after every n sequence values, where n is the MESSAGE_METADATA_SEQ_CONFIG.getCacheSize() + + Sequence mmdSeq = getEnvironmentFacade().openSequence(getMessageMetaDataSeqDb(), + MESSAGE_METADATA_SEQ_KEY, + MESSAGE_METADATA_SEQ_CONFIG); + newMessageId = mmdSeq.get(null, 1); + } + catch (DatabaseException de) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot get sequence value for new message", de); + } + return newMessageId; + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public org.apache.qpid.server.store.Transaction newTransaction() + { + checkMessageStoreOpen(); + + return new BDBTransaction(); + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, getEnvironmentFacade()); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + entries.add(entry); + } + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(queueId, messageId)) + { + break; + } + } + + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + } + + /** + * Retrieves message meta-data. + * + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException + { + if (getLogger().isDebugEnabled()) + { + getLogger().debug("public MessageMetaData getMessageMetaData(Long messageId = " + + messageId + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + + try + { + OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Metadata not found for message with id " + messageId); + } + + StorableMessageMetaData mdd = messageBinding.entryToObject(value); + + return mdd; + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error reading message metadata for message with id " + + messageId + + ": " + + e.getMessage(), e); + } + } + + void removeMessage(long messageId, boolean sync) throws StoreException + { + boolean complete = false; + Transaction tx = null; + + Random rand = null; + int attempts = 0; + try + { + do + { + tx = null; + try + { + tx = getEnvironmentFacade().getEnvironment().beginTransaction(null, null); + + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Removing message id " + messageId); + } + + + OperationStatus status = getMessageMetaDataDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + getLogger().info( + "Message not found (attempt to remove failed - probably application initiated rollback) " + + + messageId); + } + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Deleted metadata for message " + messageId); + } + + //now remove the content data from the store if there is any. + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + getMessageContentDb().delete(tx, contentKeyEntry); + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Deleted content for message " + messageId); + } + + getEnvironmentFacade().commit(tx, sync); + + complete = true; + tx = null; + } + catch (LockConflictException e) + { + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + getLogger().warn( + "Unable to abort transaction after LockConflictExcption on removal of message with id " + + messageId, + e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw getEnvironmentFacade().handleDatabaseException("Cannot remove message with id " + + messageId, e); + } + + + getLogger().warn("Lock timeout exception. Retrying (attempt " + + (attempts + 1) + " of " + LOCK_RETRY_ATTEMPTS + ") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) + { + if(rand == null) + { + rand = new Random(); + } + + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { + + } + } + else + { + // rethrow the lock conflict exception since we could not solve by retrying + throw getEnvironmentFacade().handleDatabaseException("Cannot remove messages", e); + } + } + } + while(!complete); + } + catch (DatabaseException e) + { + getLogger().error("Unexpected BDB exception", e); + + try + { + abortTransactionSafely(tx, + getEnvironmentFacade()); + } + finally + { + tx = null; + } + + throw getEnvironmentFacade().handleDatabaseException("Error removing message with id " + + messageId + + " from database: " + + e.getMessage(), e); + } + finally + { + try + { + abortTransactionSafely(tx, + getEnvironmentFacade()); + } + finally + { + tx = null; + } + } + } + + + /** + * Fills the provided ByteBuffer with as much content for the specified message as possible, starting + * from the specified offset in the message. + * + * @param messageId The message to get the data for. + * @param offset The offset of the data within the message. + * @param dst The destination of the content read back + * + * @return The number of bytes inserted into the destination + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message Id: " + messageId + " Getting content body from offset: " + offset); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + byte[] dataAsBytes = contentTupleBinding.entryToObject(value); + int size = dataAsBytes.length; + if (offset > size) + { + throw new RuntimeException("Offset " + offset + " is greater than message size " + size + + " for message id " + messageId + "!"); + + } + + written = size - offset; + if(written > dst.remaining()) + { + written = dst.remaining(); + } + + dst.put(dataAsBytes, offset, written); + } + return written; + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) + { + Cursor cursor = null; + try + { + cursor = getMessageMetaDataDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = LongBinding.entryToLong(key); + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + + if (!handler.handle(message)) + { + break; + } + } + } + catch (DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot visit messages", e); + } + finally + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + } + + /** + * Stores a chunk of message data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param offset The offset of the data chunk in the message. + * @param contentBody The content of the data chunk. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void addContent(final Transaction tx, long messageId, int offset, + ByteBuffer contentBody) throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding messageBinding = ContentBinding.getInstance(); + messageBinding.objectToEntry(contentBody.array(), value); + try + { + OperationStatus status = getMessageContentDb().put(tx, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error adding content for message id " + messageId + ": " + status); + } + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Storing content for message " + messageId + " in transaction " + tx); + + } + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error writing AMQMessage with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + + /** + * Stores message meta-data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void storeMetaData(final Transaction tx, long messageId, + StorableMessageMetaData messageMetaData) + throws StoreException + { + if (getLogger().isDebugEnabled()) + { + getLogger().debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + messageBinding.objectToEntry(messageMetaData, value); + try + { + getMessageMetaDataDb().put(tx, key, value); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Storing message metadata for message id " + messageId + " in transaction " + tx); + } + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error writing message metadata with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + + + /** + * Places a message onto a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. + */ + private void enqueueMessage(final Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); + keyBinding.objectToEntry(dd, key); + DatabaseEntry value = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0, value); + + try + { + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Enqueuing message " + messageId + " on queue " + + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); + } + getDeliveryDb().put(tx, key, value); + } + catch (DatabaseException e) + { + getLogger().error("Failed to enqueue: " + e.getMessage(), e); + throw getEnvironmentFacade().handleDatabaseException("Error writing enqueued message with id " + + messageId + + " for queue " + + queue.getName() + + " with id " + + queue.getId() + + " to database", e); + } + } + + /** + * Extracts a message from a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void dequeueMessage(final Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); + UUID id = queue.getId(); + keyBinding.objectToEntry(queueEntryKey, key); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Dequeue message id " + messageId + " from queue " + + queue.getName() + " with id " + id); + } + + try + { + + OperationStatus status = getDeliveryDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find message with id " + messageId + " on queue " + + queue.getName() + " with id " + id); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove message with id " + messageId + " on queue" + + queue.getName() + " with id " + id); + } + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Removed message " + messageId + " on queue " + + queue.getName() + " with id " + id); + + } + } + catch (DatabaseException e) + { + + getLogger().error("Failed to dequeue message " + messageId + " in transaction " + tx, e); + + throw getEnvironmentFacade().handleDatabaseException("Error accessing database while dequeuing message: " + + e.getMessage(), e); + } + } + + private void recordXid(Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + keyBinding.objectToEntry(xid,key); + + DatabaseEntry value = new DatabaseEntry(); + PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + valueBinding.objectToEntry(preparedTransaction, value); + + try + { + getXidDb().put(txn, key, value); + } + catch (DatabaseException e) + { + getLogger().error("Failed to write xid: " + e.getMessage(), e); + throw getEnvironmentFacade().handleDatabaseException("Error writing xid to database", e); + } + } + + private void removeXid(Transaction txn, long format, byte[] globalId, byte[] branchId) + throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + + keyBinding.objectToEntry(xid, key); + + + try + { + + OperationStatus status = getXidDb().delete(txn, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find xid"); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove xid"); + } + + } + catch (DatabaseException e) + { + + getLogger().error("Failed to remove xid in transaction " + txn, e); + + throw getEnvironmentFacade().handleDatabaseException("Error accessing database while removing xid: " + + e.getMessage(), e); + } + } + + /** + * Commits all operations performed within a given transaction. + * + * @param tx The transaction to commit all operations for. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. + */ + private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException + { + if (tx == null) + { + throw new StoreException("Fatal internal error: transactional is null at commitTran"); + } + + StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit); + + if (getLogger().isDebugEnabled()) + { + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + getLogger().debug("commitTranImpl completed " + transactionType + " transaction " + tx); + } + + return result; + } + + /** + * Abandons all operations performed within a given transaction. + * + * @param tx The transaction to abandon. + * + * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. + */ + private void abortTran(final Transaction tx) throws StoreException + { + if (getLogger().isDebugEnabled()) + { + getLogger().debug("abortTran called for transaction " + tx); + } + + try + { + tx.abort(); + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); + } + } + + private void storedSizeChangeOccurred(final int delta) throws StoreException + { + try + { + storedSizeChange(delta); + } + catch(DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Stored size change exception", e); + } + } + + private void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); + + _totalStoreSize = getSizeOnDisk(); + + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } + } + + private void reduceSizeOnDisk() + { + getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (getEnvironmentFacade().getEnvironment().cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + getEnvironmentFacade().getEnvironment().checkpoint(force); + } + + + getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return getEnvironmentFacade().getEnvironment().getStats(null).getTotalLogSize(); + } + + private Database getMessageContentDb() + { + return getEnvironmentFacade().openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG); + } + + private Database getMessageMetaDataDb() + { + return getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG); + } + + private Database getMessageMetaDataSeqDb() + { + return getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG); + } + + private Database getDeliveryDb() + { + return getEnvironmentFacade().openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG); + } + + private Database getXidDb() + { + return getEnvironmentFacade().openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG); + } + + protected abstract void checkMessageStoreOpen(); + + protected abstract ConfiguredObject<?> getParent(); + + protected abstract EnvironmentFacade getEnvironmentFacade(); + + protected abstract long getPersistentSizeLowThreshold(); + + protected abstract long getPersistentSizeHighThreshold(); + + protected abstract Logger getLogger(); + + private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + private final boolean _isRecovered; + + private T _metaData; + private volatile SoftReference<T> _metaDataRef; + + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; + + StoredBDBMessage(long messageId, T metaData) + { + this(messageId, metaData, false); + } + + StoredBDBMessage(long messageId, T metaData, boolean isRecovered) + { + _messageId = messageId; + _isRecovered = isRecovered; + + if(!_isRecovered) + { + _metaData = metaData; + } + _metaDataRef = new SoftReference<T>(metaData); + } + + @Override + public T getMetaData() + { + T metaData = _metaDataRef.get(); + if(metaData == null) + { + checkMessageStoreOpen(); + + metaData = (T) getMessageMetaData(_messageId); + _metaDataRef = new SoftReference<T>(metaData); + } + + return metaData; + } + + @Override + public long getMessageNumber() + { + return _messageId; + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + src = src.slice(); + + if(_data == null) + { + _data = new byte[src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + src.duplicate().get(_data); + } + else + { + byte[] oldData = _data; + _data = new byte[oldData.length + src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + + System.arraycopy(oldData, 0, _data, 0, oldData.length); + src.duplicate().get(_data, oldData.length, src.remaining()); + } + + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } + else + { + checkMessageStoreOpen(); + + return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + return ByteBuffer.wrap(data,offsetInMessage,size); + } + else + { + ByteBuffer buf = ByteBuffer.allocate(size); + int length = getContent(offsetInMessage, buf); + buf.limit(length); + buf.position(0); + return buf; + } + } + + synchronized void store(Transaction txn) + { + if (!stored()) + { + try + { + _dataRef = new SoftReference<byte[]>(_data); + AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); + AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + } + finally + { + _metaData = null; + _data = null; + } + } + } + + @Override + public synchronized StoreFuture flushToStore() + { + if(!stored()) + { + checkMessageStoreOpen(); + + Transaction txn; + try + { + txn = getEnvironmentFacade().getEnvironment().beginTransaction( + null, null); + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e); + } + store(txn); + getEnvironmentFacade().commit(txn, true); + + storedSizeChangeOccurred(getMetaData().getContentSize()); + } + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + checkMessageStoreOpen(); + + int delta = getMetaData().getContentSize(); + removeMessage(_messageId, false); + storedSizeChangeOccurred(-delta); + } + + private boolean stored() + { + return _metaData == null || _isRecovered; + } + + @Override + public String toString() + { + return this.getClass() + "[messageId=" + _messageId + "]"; + } + } + + + private class BDBTransaction implements org.apache.qpid.server.store.Transaction + { + private Transaction _txn; + private int _storeSizeIncrease; + + private BDBTransaction() throws StoreException + { + try + { + _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null); + } + catch(DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot create store transaction", e); + } + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + checkMessageStoreOpen(); + + if(message.getStoredMessage() instanceof StoredBDBMessage) + { + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + + AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + } + + @Override + public void commitTran() throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); + } + + @Override + public StoreFuture commitTranAsync() throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); + return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + } + + @Override + public void abortTran() throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.abortTran(_txn); + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, + Record[] dequeues) throws StoreException + { + checkMessageStoreOpen(); + + AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + } + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index 37792cdd43..e8555dd819 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -20,96 +20,62 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; + import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Sequence; -import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; -import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; -import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.util.FileUtils; /** - * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept - * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove - * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and - * dequeue messages to queues. <tr><td> Generate message identifiers. </table> + * Implementation of a DurableConfigurationStore backed by BDB JE + * that also provides a MessageStore. */ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore { - public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true); - private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class); public static final int VERSION = 8; private static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; private static final String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; - private EnvironmentFacade _environmentFacade; - private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); private final EnvironmentFacadeFactory _environmentFacadeFactory; + private final ProvidedBDBMessageStore _providedMessageStore = new ProvidedBDBMessageStore(); + + private EnvironmentFacade _environmentFacade; + private String _storeLocation; - private final BDBMessageStore _messageStore = new BDBMessageStore(); private ConfiguredObject<?> _parent; public BDBConfigurationStore() @@ -168,7 +134,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); } - } private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) @@ -211,8 +176,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } finally { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); + closeCursorSafely(objectsCursor, _environmentFacade); + closeCursorSafely(hierarchyCursor, _environmentFacade); } for (ConfiguredObjectRecord record : configuredObjects.values()) @@ -226,11 +191,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } - public String getStoreLocation() - { - return _storeLocation; - } - public EnvironmentFacade getEnvironmentFacade() { return _environmentFacade; @@ -241,7 +201,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { if (_configurationStoreOpen.compareAndSet(true, false)) { - if (!_messageStore.isMessageStoreOpen()) + if (!_providedMessageStore.isMessageStoreOpen()) { closeEnvironment(); } @@ -264,38 +224,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } } - private void closeCursorSafely(Cursor cursor) throws StoreException - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot close cursor", e); - } - } - } - - private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx) - { - try - { - if (tx != null) - { - tx.abort(); - } - } - catch (DatabaseException e1) - { - // We need the possible side effect of the handler restarting the environment but don't care about the exception - _environmentFacade.handleDatabaseException(null, e1); - LOGGER.warn(errorMessage, e1); - } - } - @Override public void create(ConfiguredObjectRecord configuredObject) throws StoreException { @@ -323,7 +251,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { if (txn != null) { - abortTransactionIgnoringException("Error creating configured object", txn); + abortTransactionSafely(txn, _environmentFacade); } } } @@ -359,7 +287,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { if (txn != null) { - abortTransactionIgnoringException("Error deleting configured objects", txn); + abortTransactionSafely(txn, _environmentFacade); } } } @@ -388,7 +316,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { if (txn != null) { - abortTransactionIgnoringException("Error updating configuration details within the store", txn); + abortTransactionSafely(txn, _environmentFacade); } } } @@ -512,7 +440,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi @Override public MessageStore getMessageStore() { - return _messageStore; + return _providedMessageStore; } private void checkConfigurationStoreOpen() @@ -523,9 +451,10 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } } + @Override public void onDelete() { - if (!isConfigurationStoreOpen() && !_messageStore.isMessageStoreOpen()) + if (!isConfigurationStoreOpen() && !_providedMessageStore.isMessageStoreOpen()) { if (_storeLocation != null) { @@ -561,37 +490,12 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi return _environmentFacade.openDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, DEFAULT_DATABASE_CONFIG); } - class BDBMessageStore implements MessageStore + class ProvidedBDBMessageStore extends AbstractBDBMessageStore { - private static final int LOCK_RETRY_ATTEMPTS = 5; - - private static final String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; - private static final String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ"; - private static final String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; - private static final String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; - - //TODO: Add upgrader to remove BRIDGES and LINKS - private static final String BRIDGEDB_NAME = "BRIDGES"; - private static final String LINKDB_NAME = "LINKS"; - private static final String XID_DB_NAME = "XIDS"; - - private final EventManager _eventManager = new EventManager(); - private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); - private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes( - Charset.forName("UTF-8"))); - - private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. - setAllowCreate(true). - setInitialValue(1). - setWrap(true). - setCacheSize(100000); - - private boolean _limitBusted; private long _persistentSizeLowThreshold; private long _persistentSizeHighThreshold; - private long _totalStoreSize; private ConfiguredObject<?> _parent; @@ -606,79 +510,19 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + overfullAttr instanceof Number + ? ((Number) overfullAttr).longValue() + : Long.parseLong(overfullAttr.toString()); _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + underfullAttr instanceof Number + ? ((Number) underfullAttr).longValue() + : Long.parseLong(underfullAttr.toString()); - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - - if (_environmentFacade == null) - { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(parent, messageStoreSettings); - _storeLocation = _environmentFacade.getStoreLocation(); - } - } - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - try - { - new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary(); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e); - } - - // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service. - _totalStoreSize = getSizeOnDisk(); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - - long newMessageId = getNextMessageSequenceNumber(); - - if (metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage<T>(newMessageId, metaData); - } - } - - private long getNextMessageSequenceNumber() - { - long newMessageId; - try - { - // The implementations of sequences mean that there is only a transaction - // after every n sequence values, where n is the MESSAGE_METADATA_SEQ_CONFIG.getCacheSize() - - Sequence mmdSeq = _environmentFacade.openSequence(getMessageMetaDataSeqDb(), - MESSAGE_METADATA_SEQ_KEY, - MESSAGE_METADATA_SEQ_CONFIG); - newMessageId = mmdSeq.get(null, 1); } - catch (DatabaseException de) - { - throw _environmentFacade.handleDatabaseException("Cannot get sequence value for new message", de); - } - return newMessageId; - } - - @Override - public boolean isPersistent() - { - return true; } public boolean isMessageStoreOpen() @@ -687,673 +531,49 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } @Override - public org.apache.qpid.server.store.Transaction newTransaction() - { - checkMessageStoreOpen(); - - return new BDBTransaction(); - } - - @Override public void closeMessageStore() { - if (_messageStoreOpen.compareAndSet(true, false) && !isConfigurationStoreOpen()) - { - closeEnvironment(); - } + _messageStoreOpen.set(false); } @Override - public void addEventListener(final EventListener eventListener, final Event... events) + public EnvironmentFacade getEnvironmentFacade() { - _eventManager.addEventListener(eventListener, events); + return _environmentFacade; } @Override public void onDelete() { - BDBConfigurationStore.this.onDelete(); + // Nothing to do, message store will be deleted when configuration store is deleted } @Override public String getStoreLocation() { - return BDBConfigurationStore.this.getStoreLocation(); + return BDBConfigurationStore.this._storeLocation; } @Override - public void visitMessages(final MessageHandler handler) throws StoreException + protected long getPersistentSizeLowThreshold() { - checkMessageStoreOpen(); - visitMessagesInternal(handler, _environmentFacade); + return _persistentSizeLowThreshold; } @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + protected long getPersistentSizeHighThreshold() { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - entries.add(entry); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - + return _persistentSizeHighThreshold; } @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) - { - break; - } - } - - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e); - } - finally - { - closeCursorSafely(cursor); - } - } - - /** - * Retrieves message meta-data. - * - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " - + messageId + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - - try - { - OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Metadata not found for message with id " + messageId); - } - - StorableMessageMetaData mdd = messageBinding.entryToObject(value); - - return mdd; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); - } - } - - void removeMessage(long messageId, boolean sync) throws StoreException - { - boolean complete = false; - com.sleepycat.je.Transaction tx = null; - - Random rand = null; - int attempts = 0; - try - { - do - { - tx = null; - try - { - tx = _environmentFacade.getEnvironment().beginTransaction(null, null); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removing message id " + messageId); - } - - - OperationStatus status = getMessageMetaDataDb().delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted metadata for message " + messageId); - } - - //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - getMessageContentDb().delete(tx, contentKeyEntry); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted content for message " + messageId); - } - - _environmentFacade.commit(tx, sync); - - complete = true; - tx = null; - } - catch (LockConflictException e) - { - try - { - if(tx != null) - { - tx.abort(); - } - } - catch(DatabaseException e2) - { - LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e); - } - - - LOGGER.warn("Lock timeout exception. Retrying (attempt " - + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); - - if(++attempts < LOCK_RETRY_ATTEMPTS) - { - if(rand == null) - { - rand = new Random(); - } - - try - { - Thread.sleep(500l + (long)(500l * rand.nextDouble())); - } - catch (InterruptedException e1) - { - - } - } - else - { - // rethrow the lock conflict exception since we could not solve by retrying - throw _environmentFacade.handleDatabaseException("Cannot remove messages", e); - } - } - } - while(!complete); - } - catch (DatabaseException e) - { - LOGGER.error("Unexpected BDB exception", e); - - try - { - abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); - } - finally - { - tx = null; - } - - throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); - } - finally - { - try - { - abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); - } - finally - { - tx = null; - } - } - } - - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding contentTupleBinding = ContentBinding.getInstance(); - - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); - } - - try - { - - int written = 0; - OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - if (status == OperationStatus.SUCCESS) - { - byte[] dataAsBytes = contentTupleBinding.entryToObject(value); - int size = dataAsBytes.length; - if (offset > size) - { - throw new RuntimeException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); - - } - - written = size - offset; - if(written > dst.remaining()) - { - written = dst.remaining(); - } - - dst.put(dataAsBytes, offset, written); - } - return written; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) - { - Cursor cursor = null; - try - { - cursor = getMessageMetaDataDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - if (!handler.handle(message)) - { - break; - } - } - } - catch (DatabaseException e) - { - throw environmentFacade.handleDatabaseException("Cannot visit messages", e); - } - finally - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch(DatabaseException e) - { - throw environmentFacade.handleDatabaseException("Cannot close cursor", e); - } - } - } - } - - /** - * Stores a chunk of message data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param offset The offset of the data chunk in the message. - * @param contentBody The content of the data chunk. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, - ByteBuffer contentBody) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding messageBinding = ContentBinding.getInstance(); - messageBinding.objectToEntry(contentBody.array(), value); - try - { - OperationStatus status = getMessageContentDb().put(tx, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error adding content for message id " + messageId + ": " + status); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); - - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Stores message meta-data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param messageMetaData The message meta data to store. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, - StorableMessageMetaData messageMetaData) - throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("storeMetaData called for transaction " + tx - + ", messageId " + messageId - + ", messageMetaData " + messageMetaData); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - messageBinding.objectToEntry(messageMetaData, value); - try - { - getMessageMetaDataDb().put(tx, key, value); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - - /** - * Places a message onto a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The the queue to place the message on. - * @param messageId The message to enqueue. - * - * @throws StoreException If the operation fails for any reason. - */ - private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); - keyBinding.objectToEntry(dd, key); - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Enqueuing message " + messageId + " on queue " - + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); - } - getDeliveryDb().put(tx, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " - + queue.getName() + " with id " + queue.getId() + " to database", e); - } - } - - /** - * Extracts a message from a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The queue to take the message from. - * @param messageId The message to dequeue. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); - keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Dequeue message id " + messageId + " from queue " - + queue.getName() + " with id " + id); - } - - try - { - - OperationStatus status = getDeliveryDb().delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + queue.getName() + " with id " + id); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + queue.getName() + " with id " + id); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removed message " + messageId + " on queue " - + queue.getName() + " with id " + id); - - } - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e); - - throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e); - } - } - - private void recordXid(com.sleepycat.je.Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - keyBinding.objectToEntry(xid,key); - - DatabaseEntry value = new DatabaseEntry(); - PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - valueBinding.objectToEntry(preparedTransaction, value); - - try - { - getXidDb().put(txn, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to write xid: " + e.getMessage(), e); - throw _environmentFacade.handleDatabaseException("Error writing xid to database", e); - } - } - - private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) - throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - - keyBinding.objectToEntry(xid, key); - - - try - { - - OperationStatus status = getXidDb().delete(txn, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find xid"); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove xid"); - } - - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to remove xid in transaction " + txn, e); - - throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e); - } - } - - /** - * Commits all operations performed within a given transaction. - * - * @param tx The transaction to commit all operations for. - * - * @throws StoreException If the operation fails for any reason. - */ - private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException - { - if (tx == null) - { - throw new StoreException("Fatal internal error: transactional is null at commitTran"); - } - - StoreFuture result = _environmentFacade.commit(tx, syncCommit); - - if (LOGGER.isDebugEnabled()) - { - String transactionType = syncCommit ? "synchronous" : "asynchronous"; - LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); - } - - return result; - } - - /** - * Abandons all operations performed within a given transaction. - * - * @param tx The transaction to abandon. - * - * @throws StoreException If the operation fails for any reason. - */ - private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException + protected ConfiguredObject<?> getParent() { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("abortTran called for transaction " + tx); - } - - try - { - tx.abort(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); - } + return _parent; } - private void checkMessageStoreOpen() + @Override + protected void checkMessageStoreOpen() { if (!_messageStoreOpen.get()) { @@ -1361,384 +581,11 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } } - private void storedSizeChangeOccurred(final int delta) throws StoreException - { - try - { - storedSizeChange(delta); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Stored size change exception", e); - } - } - - private void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized (this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 2*delta; - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - _totalStoreSize = getSizeOnDisk(); - - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - _totalStoreSize = getSizeOnDisk(); - - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(); - - _totalStoreSize = getSizeOnDisk(); - - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - } - } - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - private void reduceSizeOnDisk() - { - _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); - boolean cleaned = false; - while (_environmentFacade.getEnvironment().cleanLog() > 0) - { - cleaned = true; - } - if (cleaned) - { - CheckpointConfig force = new CheckpointConfig(); - force.setForce(true); - _environmentFacade.getEnvironment().checkpoint(force); - } - - - _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); - } - - private long getSizeOnDisk() - { - return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize(); - } - - private Database getMessageContentDb() - { - return _environmentFacade.openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG); - } - - private Database getMessageMetaDataDb() - { - return _environmentFacade.openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG); - } - - private Database getMessageMetaDataSeqDb() - { - return _environmentFacade.openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG); - } - - private Database getDeliveryDb() - { - return _environmentFacade.openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG); - } - - private Database getXidDb() - { - return _environmentFacade.openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG); - } - - private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> - { - - private final long _messageId; - private final boolean _isRecovered; - - private T _metaData; - private volatile SoftReference<T> _metaDataRef; - - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; - - StoredBDBMessage(long messageId, T metaData) - { - this(messageId, metaData, false); - } - - StoredBDBMessage(long messageId, T metaData, boolean isRecovered) - { - _messageId = messageId; - _isRecovered = isRecovered; - - if(!_isRecovered) - { - _metaData = metaData; - } - _metaDataRef = new SoftReference<T>(metaData); - } - - @Override - public T getMetaData() - { - T metaData = _metaDataRef.get(); - if(metaData == null) - { - checkMessageStoreOpen(); - - metaData = (T) getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<T>(metaData); - } - - return metaData; - } - - @Override - public long getMessageNumber() - { - return _messageId; - } - - @Override - public void addContent(int offsetInMessage, java.nio.ByteBuffer src) - { - src = src.slice(); - - if(_data == null) - { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); - } - else - { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - - System.arraycopy(oldData, 0, _data, 0, oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); - } - - } - - @Override - public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else - { - checkMessageStoreOpen(); - - return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else - { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; - } - } - - synchronized void store(com.sleepycat.je.Transaction txn) - { - if (!stored()) - { - try - { - _dataRef = new SoftReference<byte[]>(_data); - BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - BDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); - } - finally - { - _metaData = null; - _data = null; - } - } - } - - @Override - public synchronized StoreFuture flushToStore() - { - if(!stored()) - { - checkMessageStoreOpen(); - - com.sleepycat.je.Transaction txn; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction( - null, null); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("failed to begin transaction", e); - } - store(txn); - _environmentFacade.commit(txn, true); - - storedSizeChangeOccurred(getMetaData().getContentSize()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void remove() - { - checkMessageStoreOpen(); - - int delta = getMetaData().getContentSize(); - removeMessage(_messageId, false); - storedSizeChangeOccurred(-delta); - } - - private boolean stored() - { - return _metaData == null || _isRecovered; - } - - @Override - public String toString() - { - return this.getClass() + "[messageId=" + _messageId + "]"; - } - } - - - private class BDBTransaction implements org.apache.qpid.server.store.Transaction + @Override + protected Logger getLogger() { - private com.sleepycat.je.Transaction _txn; - private int _storeSizeIncrease; - - private BDBTransaction() throws StoreException - { - try - { - _txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e); - } - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException - { - checkMessageStoreOpen(); - - if(message.getStoredMessage() instanceof StoredBDBMessage) - { - final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); - } - - BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); - } - - @Override - public void commitTran() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.commitTranImpl(_txn, true); - BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - } - - @Override - public StoreFuture commitTranAsync() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - return BDBMessageStore.this.commitTranImpl(_txn, false); - } - - @Override - public void abortTran() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.abortTran(_txn); - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); - } + return LOGGER; } - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java new file mode 100644 index 0000000000..c3969090be --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store.berkeleydb; + + +import java.io.File; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.sleepycat.je.DatabaseException; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.util.FileUtils; + +/** + * Implementation of a MessageStore backed by BDB JE. + */ +public class BDBMessageStore extends AbstractBDBMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); + + private final EnvironmentFacadeFactory _environmentFacadeFactory; + + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); + + private EnvironmentFacade _environmentFacade; + + private ConfiguredObject<?> _parent; + private String _storeLocation; + + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + public BDBMessageStore() + { + this(new StandardEnvironmentFacadeFactory()); + } + + public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) + { + _environmentFacadeFactory = environmentFacadeFactory; + } + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(parent, messageStoreSettings); + _storeLocation = _environmentFacade.getStoreLocation(); + } + } + + @Override + public void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + if (_environmentFacade != null) + { + try + { + _environmentFacade.close(); + _environmentFacade = null; + } + catch (DatabaseException e) + { + throw new StoreException("Exception occurred on message store close", e); + } + } + } + } + + @Override + public void onDelete() + { + if (!_messageStoreOpen.get()) + { + if (_storeLocation != null) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + _storeLocation); + } + + File location = new File(_storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + LOGGER.error("Cannot delete " + _storeLocation); + } + } + } + } + } + + @Override + public EnvironmentFacade getEnvironmentFacade() + { + return _environmentFacade; + } + + @Override + protected long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + @Override + protected long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected ConfiguredObject<?> getParent() + { + return _parent; + } + + @Override + public String getStoreLocation() + { + return _storeLocation; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java new file mode 100644 index 0000000000..8504971a7a --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.je.Cursor; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Transaction; + +import org.apache.qpid.server.store.StoreException; + +public class BDBUtils +{ + public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true); + + public static void closeCursorSafely(Cursor cursor, final EnvironmentFacade environmentFacade) throws StoreException + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + // We need the possible side effect of the facade restarting the environment but don't care about the exception + throw environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + + public static void abortTransactionSafely(Transaction tx, EnvironmentFacade environmentFacade) + { + try + { + if (tx != null) + { + tx.abort(); + } + } + catch (DatabaseException e) + { + // We need the possible side effect of the facade restarting the environment but don't care about the exception + environmentFacade.handleDatabaseException("Cannot abort transaction", e); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java index 2f1c9f9387..7c2dcc507d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @ManagedObject(category = false, type = BDBVirtualHost.VIRTUAL_HOST_TYPE) @@ -45,6 +46,6 @@ public class BDBVirtualHost extends AbstractVirtualHost<BDBVirtualHost> @Override protected MessageStore createMessageStore() { - return new BDBConfigurationStore().getMessageStore(); + return new BDBMessageStore(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index f445171005..65fc0e9168 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -78,7 +78,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB @Override protected MessageStore createStore() throws Exception { - MessageStore store = (new BDBConfigurationStore()).getMessageStore(); + MessageStore store = new BDBMessageStore(); return store; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index d623da846c..e40c6213cf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -50,7 +50,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.util.FileUtils; -import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against @@ -431,7 +431,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new BDBConfigurationStore().getMessageStore(); + return new BDBMessageStore(); } } |
