diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
| commit | 39249098b7b374c5e45d7139aa8b9df3aebad385 (patch) | |
| tree | ab13b41b26d2036f5765e3a95b8692fe3903ce54 /qpid/java | |
| parent | 53fd008b70676ce1382bec414bcd0d86299a4ced (diff) | |
| download | qpid-python-39249098b7b374c5e45d7139aa8b9df3aebad385.tar.gz | |
QPID-5800: [Java Broker} Refactor MessageStore implementations extracting a MessageStoreProvider interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
42 files changed, 2639 insertions, 2328 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java new file mode 100644 index 0000000000..2766c2ac98 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -0,0 +1,1802 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Xid; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; +import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.util.FileUtils; + +/** + * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept + * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove + * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and + * dequeue messages to queues. <tr><td> Generate message identifiers. </table> + */ +public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class); + + public static final int VERSION = 8; + private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; + + private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; + private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; + private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + + //TODO: Add upgrader to remove BRIDGES and LINKS + private static String BRIDGEDB_NAME = "BRIDGES"; + private static String LINKDB_NAME = "LINKS"; + private static String XID_DB_NAME = "XIDS"; + private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; + private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; + private EnvironmentFacade _environmentFacade; + private final AtomicLong _messageId = new AtomicLong(0); + + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + + private long _totalStoreSize; + + private final EnvironmentFacadeFactory _environmentFacadeFactory; + + private volatile Committer _committer; + + private boolean _isMessageStoreProvider; + + private String _storeLocation; + private final BDBMessageStore _messageStoreFacade = new BDBMessageStore(); + + public BDBConfigurationStore() + { + this(new StandardEnvironmentFacadeFactory()); + } + + public BDBConfigurationStore(EnvironmentFacadeFactory environmentFacadeFactory) + { + _environmentFacadeFactory = environmentFacadeFactory; + } + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + if (_environmentFacade == null) + { + EnvironmentFacadeTask[] initialisationTasks = null; + _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false); + if (_isMessageStoreProvider) + { + String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); + System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask()}; + } + else + { + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; + } + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); + _storeLocation = _environmentFacade.getStoreLocation(); + } + else + { + throw new IllegalStateException("The database have been already opened as message store"); + } + } + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + checkConfigurationStoreOpen(); + + try + { + handler.begin(); + doVisitAllConfiguredObjectRecords(handler); + handler.end(); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>(); + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; + try + { + objectsCursor = getConfiguredObjectsDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); + } + + // set parents + hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + } + } + } + finally + { + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); + } + + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } + } + + } + + public String getStoreLocation() + { + return _storeLocation; + } + + public EnvironmentFacade getEnvironmentFacade() + { + return _environmentFacade; + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_configurationStoreOpen.compareAndSet(true, false)) + { + if (!_messageStoreOpen.get()) + { + closeEnvironment(); + } + } + } + + private void closeEnvironment() + { + if (_environmentFacade != null) + { + try + { + _environmentFacade.close(); + _environmentFacade = null; + } + catch(DatabaseException e) + { + throw new StoreException("Exception occurred on message store close", e); + } + } + } + + private void closeCursorSafely(Cursor cursor) throws StoreException + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + + private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx) + { + try + { + if (tx != null) + { + tx.abort(); + } + } + catch (DatabaseException e1) + { + // We need the possible side effect of the handler restarting the environment but don't care about the exception + _environmentFacade.handleDatabaseException(null, e1); + LOGGER.warn(errorMessage, e1); + } + } + + @Override + public void create(ConfiguredObjectRecord configuredObject) throws StoreException + { + checkConfigurationStoreOpen(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Create " + configuredObject); + } + + com.sleepycat.je.Transaction txn = null; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + storeConfiguredObjectEntry(txn, configuredObject); + txn.commit(); + txn = null; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + + " in database: " + e.getMessage(), e); + } + finally + { + if (txn != null) + { + abortTransactionIgnoringException("Error creating configured object", txn); + } + } + } + + @Override + public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException + { + checkConfigurationStoreOpen(); + + com.sleepycat.je.Transaction txn = null; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + for(ConfiguredObjectRecord record : objects) + { + if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS) + { + removed.add(record.getId()); + } + } + + txn.commit(); + txn = null; + return removed.toArray(new UUID[removed.size()]); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e); + } + finally + { + if (txn != null) + { + abortTransactionIgnoringException("Error deleting configured objects", txn); + } + } + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException + { + checkConfigurationStoreOpen(); + + com.sleepycat.je.Transaction txn = null; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(createIfNecessary, record, txn); + } + txn.commit(); + txn = null; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); + } + finally + { + if (txn != null) + { + abortTransactionIgnoringException("Error updating configuration details within the store", txn); + } + } + } + + private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record); + } + + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(record.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + DatabaseEntry newValue = new DatabaseEntry(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + + OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT); + final boolean isNewRecord = status == OperationStatus.NOTFOUND; + if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord)) + { + // write the updated entry to the store + configuredObjectBinding.objectToEntry(record, newValue); + status = getConfiguredObjectsDb().put(txn, key, newValue); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error updating configuration details within the store: " + status); + } + if(isNewRecord) + { + writeHierarchyRecords(txn, record); + } + } + else if (status != OperationStatus.NOTFOUND) + { + throw new StoreException("Error finding configuration details within the store: " + status); + } + } + + + private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing configured object record: " + configuredObject); + } + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " to database: " + + status); + } + writeHierarchyRecords(txn, configuredObject); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); + } + } + + private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) + { + OperationStatus status; + HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance(); + DatabaseEntry hierarchyKey = new DatabaseEntry(); + DatabaseEntry hierarchyValue = new DatabaseEntry(); + + for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet()) + { + + hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey); + UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue); + status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: " + + status); + } + } + } + + private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException + { + UUID id = record.getId(); + Map<String, ConfiguredObjectRecord> parents = record.getParents(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removing configured object: " + id); + } + DatabaseEntry key = new DatabaseEntry(); + + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(id, key); + OperationStatus status = getConfiguredObjectsDb().delete(tx, key); + if(status == OperationStatus.SUCCESS) + { + for(String parentType : parents.keySet()) + { + DatabaseEntry hierarchyKey = new DatabaseEntry(); + HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance(); + keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey); + getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey); + } + } + return status; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + public void onDelete() + { + if (!_configurationStoreOpen.get() && !_messageStoreOpen.get()) + { + if (_storeLocation != null) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + _storeLocation); + } + + File location = new File(_storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + LOGGER.error("Cannot delete " + _storeLocation); + } + } + } + } + } + + private Database getConfiguredObjectsDb() + { + return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); + } + + private Database getConfiguredObjectHierarchyDb() + { + return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME); + } + + private Database getMessageContentDb() + { + return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); + } + + private Database getMessageMetaDataDb() + { + return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME); + } + + private Database getDeliveryDb() + { + return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME); + } + + private Database getXidDb() + { + return _environmentFacade.getOpenDatabase(XID_DB_NAME); + } + + class UpgradeTask implements EnvironmentFacadeTask + { + private final ConfiguredObject<?> _parent; + + public UpgradeTask(ConfiguredObject<?> parent) + { + _parent = parent; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot upgrade store", e); + } + } + } + + class OpenDatabasesTask implements EnvironmentFacadeTask + { + private String[] _names; + + public OpenDatabasesTask(String[] names) + { + _names = names; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + facade.openDatabases(dbConfig, _names); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot open databases", e); + } + } + + } + + class DiskSpaceTask implements EnvironmentFacadeTask + { + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot evaluate disk store size", e); + } + } + + } + + public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler + { + private long _maxId; + + @Override + public void execute(EnvironmentFacade facade) + { + ((BDBMessageStore)getMessageStore()).visitMessagesInternal(this, facade); + _messageId.set(_maxId); + } + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_maxId<id) + { + _maxId = id; + } + return true; + } + } + + class BDBMessageStore implements MessageStore + { + private static final int LOCK_RETRY_ATTEMPTS = 5; + + private final EventManager _eventManager = new EventManager(); + + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + + Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + if (_environmentFacade == null) + { + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, + new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); + _storeLocation = _environmentFacade.getStoreLocation(); + } + + _committer = _environmentFacade.createCommitter(parent.getName()); + _committer.start(); + } + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + { + if (metaData.isPersistent()) + { + return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); + } + else + { + return new StoredMemoryMessage<T>(getNewMessageId(), metaData); + } + } + + /** + * Return a valid, currently unused message id. + * + * @return A fresh message id. + */ + private long getNewMessageId() + { + return _messageId.incrementAndGet(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public org.apache.qpid.server.store.Transaction newTransaction() + { + checkMessageStoreOpen(); + + return new BDBTransaction(); + } + + @Override + public void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + try + { + if (_committer != null) + { + _committer.stop(); + } + } + finally + { + if (!_configurationStoreOpen.get()) + { + closeEnvironment(); + } + } + } + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + + @Override + public void onDelete() + { + BDBConfigurationStore.this.onDelete(); + } + + @Override + public String getStoreLocation() + { + return BDBConfigurationStore.this.getStoreLocation(); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, _environmentFacade); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + entries.add(entry); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(queueId, messageId)) + { + break; + } + } + + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor); + } + } + + /** + * Retrieves message meta-data. + * + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " + + messageId + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + + try + { + OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Metadata not found for message with id " + messageId); + } + + StorableMessageMetaData mdd = messageBinding.entryToObject(value); + + return mdd; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); + } + } + + void removeMessage(long messageId, boolean sync) throws StoreException + { + boolean complete = false; + com.sleepycat.je.Transaction tx = null; + + Random rand = null; + int attempts = 0; + try + { + do + { + tx = null; + try + { + tx = _environmentFacade.getEnvironment().beginTransaction(null, null); + + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removing message id " + messageId); + } + + + OperationStatus status = getMessageMetaDataDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleted metadata for message " + messageId); + } + + //now remove the content data from the store if there is any. + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + getMessageContentDb().delete(tx, contentKeyEntry); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleted content for message " + messageId); + } + + _environmentFacade.commit(tx); + _committer.commit(tx, sync); + + complete = true; + tx = null; + } + catch (LockConflictException e) + { + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e); + } + + + LOGGER.warn("Lock timeout exception. Retrying (attempt " + + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) + { + if(rand == null) + { + rand = new Random(); + } + + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { + + } + } + else + { + // rethrow the lock conflict exception since we could not solve by retrying + throw _environmentFacade.handleDatabaseException("Cannot remove messages", e); + } + } + } + while(!complete); + } + catch (DatabaseException e) + { + LOGGER.error("Unexpected BDB exception", e); + + try + { + abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); + } + finally + { + tx = null; + } + + throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); + } + finally + { + try + { + abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); + } + finally + { + tx = null; + } + } + } + + + /** + * Fills the provided ByteBuffer with as much content for the specified message as possible, starting + * from the specified offset in the message. + * + * @param messageId The message to get the data for. + * @param offset The offset of the data within the message. + * @param dst The destination of the content read back + * + * @return The number of bytes inserted into the destination + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + byte[] dataAsBytes = contentTupleBinding.entryToObject(value); + int size = dataAsBytes.length; + if (offset > size) + { + throw new RuntimeException("Offset " + offset + " is greater than message size " + size + + " for message id " + messageId + "!"); + + } + + written = size - offset; + if(written > dst.remaining()) + { + written = dst.remaining(); + } + + dst.put(dataAsBytes, offset, written); + } + return written; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) + { + Cursor cursor = null; + try + { + cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = LongBinding.entryToLong(key); + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + + if (!handler.handle(message)) + { + break; + } + } + } + catch (DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot visit messages", e); + } + finally + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + } + + /** + * Stores a chunk of message data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param offset The offset of the data chunk in the message. + * @param contentBody The content of the data chunk. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, + ByteBuffer contentBody) throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding messageBinding = ContentBinding.getInstance(); + messageBinding.objectToEntry(contentBody.array(), value); + try + { + OperationStatus status = getMessageContentDb().put(tx, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error adding content for message id " + messageId + ": " + status); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); + + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + /** + * Stores message meta-data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, + StorableMessageMetaData messageMetaData) + throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); + } + + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + + MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); + messageBinding.objectToEntry(messageMetaData, value); + try + { + getMessageMetaDataDb().put(tx, key, value); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + + /** + * Places a message onto a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws StoreException If the operation fails for any reason. + */ + private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); + keyBinding.objectToEntry(dd, key); + DatabaseEntry value = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0, value); + + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Enqueuing message " + messageId + " on queue " + + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); + } + getDeliveryDb().put(tx, key, value); + } + catch (DatabaseException e) + { + LOGGER.error("Failed to enqueue: " + e.getMessage(), e); + throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " + + queue.getName() + " with id " + queue.getId() + " to database", e); + } + } + + /** + * Extracts a message from a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws StoreException + { + + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); + UUID id = queue.getId(); + keyBinding.objectToEntry(queueEntryKey, key); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Dequeue message id " + messageId + " from queue " + + queue.getName() + " with id " + id); + } + + try + { + + OperationStatus status = getDeliveryDb().delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find message with id " + messageId + " on queue " + + queue.getName() + " with id " + id); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove message with id " + messageId + " on queue" + + queue.getName() + " with id " + id); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removed message " + messageId + " on queue " + + queue.getName() + " with id " + id); + + } + } + catch (DatabaseException e) + { + + LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e); + + throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e); + } + } + + private void recordXid(com.sleepycat.je.Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + keyBinding.objectToEntry(xid,key); + + DatabaseEntry value = new DatabaseEntry(); + PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + valueBinding.objectToEntry(preparedTransaction, value); + + try + { + getXidDb().put(txn, key, value); + } + catch (DatabaseException e) + { + LOGGER.error("Failed to write xid: " + e.getMessage(), e); + throw _environmentFacade.handleDatabaseException("Error writing xid to database", e); + } + } + + private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) + throws StoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidBinding keyBinding = XidBinding.getInstance(); + + keyBinding.objectToEntry(xid, key); + + + try + { + + OperationStatus status = getXidDb().delete(txn, key); + if (status == OperationStatus.NOTFOUND) + { + throw new StoreException("Unable to find xid"); + } + else if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Unable to remove xid"); + } + + } + catch (DatabaseException e) + { + + LOGGER.error("Failed to remove xid in transaction " + txn, e); + + throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e); + } + } + + /** + * Commits all operations performed within a given transaction. + * + * @param tx The transaction to commit all operations for. + * + * @throws StoreException If the operation fails for any reason. + */ + private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException + { + if (tx == null) + { + throw new StoreException("Fatal internal error: transactional is null at commitTran"); + } + + _environmentFacade.commit(tx); + StoreFuture result = _committer.commit(tx, syncCommit); + + if (LOGGER.isDebugEnabled()) + { + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); + } + + return result; + } + + /** + * Abandons all operations performed within a given transaction. + * + * @param tx The transaction to abandon. + * + * @throws StoreException If the operation fails for any reason. + */ + private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("abortTran called for transaction " + tx); + } + + try + { + tx.abort(); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); + } + } + + private void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + private void storedSizeChangeOccurred(final int delta) throws StoreException + { + try + { + storedSizeChange(delta); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Stored size change exception", e); + } + } + + private void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); + + _totalStoreSize = getSizeOnDisk(); + + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + private void reduceSizeOnDisk() + { + _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (_environmentFacade.getEnvironment().cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + _environmentFacade.getEnvironment().checkpoint(force); + } + + + _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize(); + } + + private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + private final boolean _isRecovered; + + private T _metaData; + private volatile SoftReference<T> _metaDataRef; + + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; + + StoredBDBMessage(long messageId, T metaData) + { + this(messageId, metaData, false); + } + + StoredBDBMessage(long messageId, T metaData, boolean isRecovered) + { + _messageId = messageId; + _isRecovered = isRecovered; + + if(!_isRecovered) + { + _metaData = metaData; + } + _metaDataRef = new SoftReference<T>(metaData); + } + + @Override + public T getMetaData() + { + T metaData = _metaDataRef.get(); + if(metaData == null) + { + checkMessageStoreOpen(); + + metaData = (T) getMessageMetaData(_messageId); + _metaDataRef = new SoftReference<T>(metaData); + } + + return metaData; + } + + @Override + public long getMessageNumber() + { + return _messageId; + } + + @Override + public void addContent(int offsetInMessage, java.nio.ByteBuffer src) + { + src = src.slice(); + + if(_data == null) + { + _data = new byte[src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + src.duplicate().get(_data); + } + else + { + byte[] oldData = _data; + _data = new byte[oldData.length + src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + + System.arraycopy(oldData, 0, _data, 0, oldData.length); + src.duplicate().get(_data, oldData.length, src.remaining()); + } + + } + + @Override + public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } + else + { + checkMessageStoreOpen(); + + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + return ByteBuffer.wrap(data,offsetInMessage,size); + } + else + { + ByteBuffer buf = ByteBuffer.allocate(size); + int length = getContent(offsetInMessage, buf); + buf.limit(length); + buf.position(0); + return buf; + } + } + + synchronized void store(com.sleepycat.je.Transaction txn) + { + if (!stored()) + { + try + { + _dataRef = new SoftReference<byte[]>(_data); + BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); + BDBMessageStore.this.addContent(txn, _messageId, 0, + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + } + finally + { + _metaData = null; + _data = null; + } + } + } + + @Override + public synchronized StoreFuture flushToStore() + { + if(!stored()) + { + checkMessageStoreOpen(); + + com.sleepycat.je.Transaction txn; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction( + null, null); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("failed to begin transaction", e); + } + store(txn); + _environmentFacade.commit(txn); + _committer.commit(txn, true); + + storedSizeChangeOccurred(getMetaData().getContentSize()); + } + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + checkMessageStoreOpen(); + + int delta = getMetaData().getContentSize(); + removeMessage(_messageId, false); + storedSizeChangeOccurred(-delta); + } + + private boolean stored() + { + return _metaData == null || _isRecovered; + } + + @Override + public String toString() + { + return this.getClass() + "[messageId=" + _messageId + "]"; + } + } + + + private class BDBTransaction implements org.apache.qpid.server.store.Transaction + { + private com.sleepycat.je.Transaction _txn; + private int _storeSizeIncrease; + + private BDBTransaction() throws StoreException + { + try + { + _txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e); + } + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + checkMessageStoreOpen(); + + if(message.getStoredMessage() instanceof StoredBDBMessage) + { + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + + BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + } + + @Override + public void commitTran() throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.commitTranImpl(_txn, true); + BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); + } + + @Override + public StoreFuture commitTranAsync() throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); + return BDBMessageStore.this.commitTranImpl(_txn, false); + } + + @Override + public void abortTran() throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.abortTran(_txn); + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, + Record[] dequeues) throws StoreException + { + checkMessageStoreOpen(); + + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + } + } + + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java deleted file mode 100644 index 9fc90f9d7c..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ /dev/null @@ -1,1766 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.Xid; -import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; -import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; -import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Transaction; - -/** - * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept - * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove - * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and - * dequeue messages to queues. <tr><td> Generate message identifiers. </table> - */ -public class BDBMessageStore implements MessageStore, DurableConfigurationStore -{ - private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - - public static final int VERSION = 8; - private static final int LOCK_RETRY_ATTEMPTS = 5; - private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; - private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; - - private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; - private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; - private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; - - //TODO: Add upgrader to remove BRIDGES and LINKS - private static String BRIDGEDB_NAME = "BRIDGES"; - private static String LINKDB_NAME = "LINKS"; - private static String XID_DB_NAME = "XIDS"; - private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; - private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; - - private EnvironmentFacade _environmentFacade; - private final AtomicLong _messageId = new AtomicLong(0); - - private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); - private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - private final EventManager _eventManager = new EventManager(); - - private final EnvironmentFacadeFactory _environmentFacadeFactory; - - private volatile Committer _committer; - - private boolean _isMessageStoreProvider; - - private String _storeLocation; - - public BDBMessageStore() - { - this(new StandardEnvironmentFacadeFactory()); - } - - public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) - { - _environmentFacadeFactory = environmentFacadeFactory; - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) - { - if (_configurationStoreOpen.compareAndSet(false, true)) - { - if (_environmentFacade == null) - { - EnvironmentFacadeTask[] initialisationTasks = null; - _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false); - if (_isMessageStoreProvider) - { - String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; - System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); - System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); - initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() }; - } - else - { - initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; - } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); - _storeLocation = _environmentFacade.getStoreLocation(); - } - else - { - throw new IllegalStateException("The database have been already opened as message store"); - } - } - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) - { - checkConfigurationStoreOpen(); - - try - { - handler.begin(); - doVisitAllConfiguredObjectRecords(handler); - handler.end(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); - } - - } - - private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) - { - Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>(); - Cursor objectsCursor = null; - Cursor hierarchyCursor = null; - try - { - objectsCursor = getConfiguredObjectsDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - BDBConfiguredObjectRecord configuredObject = - (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); - configuredObjects.put(configuredObject.getId(), configuredObject); - } - - // set parents - hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); - while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); - if(child != null) - { - ConfiguredObjectRecord parent = configuredObjects.get(parentId); - if(parent != null) - { - child.addParent(hk.getParentType(), parent); - } - } - } - } - finally - { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); - } - - for (ConfiguredObjectRecord record : configuredObjects.values()) - { - boolean shoudlContinue = handler.handle(record); - if (!shoudlContinue) - { - break; - } - } - - } - - @Override - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException - { - if (_messageStoreOpen.compareAndSet(false, true)) - { - - Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); - Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - if (_environmentFacade == null) - { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, - new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); - _storeLocation = _environmentFacade.getStoreLocation(); - } - - _committer = _environmentFacade.createCommitter(parent.getName()); - _committer.start(); - } - } - - @Override - public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException - { - checkMessageStoreOpen(); - - return new BDBTransaction(); - } - - @Override - public String getStoreLocation() - { - return _storeLocation; - } - - public EnvironmentFacade getEnvironmentFacade() - { - return _environmentFacade; - } - - @Override - public void closeMessageStore() throws StoreException - { - if (_messageStoreOpen.compareAndSet(true, false)) - { - try - { - if (_committer != null) - { - _committer.close(); - } - } - finally - { - if (!_configurationStoreOpen.get()) - { - closeEnvironment(); - } - } - } - } - - @Override - public void closeConfigurationStore() throws StoreException - { - if (_configurationStoreOpen.compareAndSet(true, false)) - { - if (!_messageStoreOpen.get()) - { - closeEnvironment(); - } - } - } - - private void closeEnvironment() - { - if (_environmentFacade != null) - { - try - { - _environmentFacade.close(); - _environmentFacade = null; - } - catch(DatabaseException e) - { - throw new StoreException("Exception occured on message store close", e); - } - } - } - - private void closeCursorSafely(Cursor cursor) throws StoreException - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot close cursor", e); - } - } - } - - - void removeMessage(long messageId, boolean sync) throws StoreException - { - boolean complete = false; - com.sleepycat.je.Transaction tx = null; - - Random rand = null; - int attempts = 0; - try - { - do - { - tx = null; - try - { - tx = _environmentFacade.beginTransaction(); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removing message id " + messageId); - } - - - OperationStatus status = getMessageMetaDataDb().delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted metadata for message " + messageId); - } - - //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - getMessageContentDb().delete(tx, contentKeyEntry); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted content for message " + messageId); - } - - _environmentFacade.commit(tx); - _committer.commit(tx, sync); - - complete = true; - tx = null; - } - catch (LockConflictException e) - { - try - { - if(tx != null) - { - tx.abort(); - } - } - catch(DatabaseException e2) - { - LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e); - } - - - LOGGER.warn("Lock timeout exception. Retrying (attempt " - + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); - - if(++attempts < LOCK_RETRY_ATTEMPTS) - { - if(rand == null) - { - rand = new Random(); - } - - try - { - Thread.sleep(500l + (long)(500l * rand.nextDouble())); - } - catch (InterruptedException e1) - { - - } - } - else - { - // rethrow the lock conflict exception since we could not solve by retrying - throw _environmentFacade.handleDatabaseException("Cannot remove messages", e); - } - } - } - while(!complete); - } - catch (DatabaseException e) - { - LOGGER.error("Unexpected BDB exception", e); - - try - { - abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); - } - finally - { - tx = null; - } - - throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); - } - finally - { - try - { - abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx); - } - finally - { - tx = null; - } - } - } - - private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx) - { - try - { - if (tx != null) - { - tx.abort(); - } - } - catch (DatabaseException e1) - { - // We need the possible side effect of the handler restarting the environment but don't care about the exception - _environmentFacade.handleDatabaseException(null, e1); - LOGGER.warn(errorMessage, e1); - } - } - - @Override - public void create(ConfiguredObjectRecord configuredObject) throws StoreException - { - checkConfigurationStoreOpen(); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Create " + configuredObject); - } - - com.sleepycat.je.Transaction txn = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - storeConfiguredObjectEntry(txn, configuredObject); - txn.commit(); - txn = null; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject - + " in database: " + e.getMessage(), e); - } - finally - { - if (txn != null) - { - abortTransactionIgnoringException("Error creating configured object", txn); - } - } - } - - @Override - public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException - { - checkConfigurationStoreOpen(); - - com.sleepycat.je.Transaction txn = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - for(ConfiguredObjectRecord record : objects) - { - if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS) - { - removed.add(record.getId()); - } - } - - txn.commit(); - txn = null; - return removed.toArray(new UUID[removed.size()]); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e); - } - finally - { - if (txn != null) - { - abortTransactionIgnoringException("Error deleting configured objects", txn); - } - } - - } - - @Override - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - checkConfigurationStoreOpen(); - - com.sleepycat.je.Transaction txn = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - for(ConfiguredObjectRecord record : records) - { - update(createIfNecessary, record, txn); - } - txn.commit(); - txn = null; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); - } - finally - { - if (txn != null) - { - abortTransactionIgnoringException("Error updating configuration details within the store", txn); - } - } - - } - - private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record); - } - - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(record.getId(), key); - - DatabaseEntry value = new DatabaseEntry(); - DatabaseEntry newValue = new DatabaseEntry(); - ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - - OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT); - final boolean isNewRecord = status == OperationStatus.NOTFOUND; - if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord)) - { - // write the updated entry to the store - configuredObjectBinding.objectToEntry(record, newValue); - status = getConfiguredObjectsDb().put(txn, key, newValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error updating configuration details within the store: " + status); - } - if(isNewRecord) - { - writeHierarchyRecords(txn, record); - } - } - else if (status != OperationStatus.NOTFOUND) - { - throw new StoreException("Error finding configuration details within the store: " + status); - } - } - - /** - * Places a message onto a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The the queue to place the message on. - * @param messageId The message to enqueue. - * - * @throws StoreException If the operation fails for any reason. - */ - private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); - keyBinding.objectToEntry(dd, key); - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Enqueuing message " + messageId + " on queue " - + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); - } - getDeliveryDb().put(tx, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " - + queue.getName() + " with id " + queue.getId() + " to database", e); - } - } - - /** - * Extracts a message from a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The queue to take the message from. - * @param messageId The message to dequeue. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); - keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Dequeue message id " + messageId + " from queue " - + queue.getName() + " with id " + id); - } - - try - { - - OperationStatus status = getDeliveryDb().delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + queue.getName() + " with id " + id); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + queue.getName() + " with id " + id); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removed message " + messageId + " on queue " - + queue.getName() + " with id " + id); - - } - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e); - - throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e); - } - } - - - private void recordXid(com.sleepycat.je.Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - keyBinding.objectToEntry(xid,key); - - DatabaseEntry value = new DatabaseEntry(); - PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - valueBinding.objectToEntry(preparedTransaction, value); - - try - { - getXidDb().put(txn, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to write xid: " + e.getMessage(), e); - throw _environmentFacade.handleDatabaseException("Error writing xid to database", e); - } - } - - private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) - throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - - keyBinding.objectToEntry(xid, key); - - - try - { - - OperationStatus status = getXidDb().delete(txn, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find xid"); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove xid"); - } - - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to remove xid in transaction " + txn, e); - - throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e); - } - } - - /** - * Commits all operations performed within a given transaction. - * - * @param tx The transaction to commit all operations for. - * - * @throws StoreException If the operation fails for any reason. - */ - private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException - { - if (tx == null) - { - throw new StoreException("Fatal internal error: transactional is null at commitTran"); - } - - _environmentFacade.commit(tx); - StoreFuture result = _committer.commit(tx, syncCommit); - - if (LOGGER.isDebugEnabled()) - { - String transactionType = syncCommit ? "synchronous" : "asynchronous"; - LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); - } - - return result; - } - - /** - * Abandons all operations performed within a given transaction. - * - * @param tx The transaction to abandon. - * - * @throws StoreException If the operation fails for any reason. - */ - private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("abortTran called for transaction " + tx); - } - - try - { - tx.abort(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); - } - } - - /** - * Return a valid, currently unused message id. - * - * @return A fresh message id. - */ - private long getNewMessageId() - { - return _messageId.incrementAndGet(); - } - - /** - * Stores a chunk of message data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param offset The offset of the data chunk in the message. - * @param contentBody The content of the data chunk. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, - ByteBuffer contentBody) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding messageBinding = ContentBinding.getInstance(); - messageBinding.objectToEntry(contentBody.array(), value); - try - { - OperationStatus status = getMessageContentDb().put(tx, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error adding content for message id " + messageId + ": " + status); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); - - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Stores message meta-data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param messageMetaData The message meta data to store. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, - StorableMessageMetaData messageMetaData) - throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("storeMetaData called for transaction " + tx - + ", messageId " + messageId - + ", messageMetaData " + messageMetaData); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - messageBinding.objectToEntry(messageMetaData, value); - try - { - getMessageMetaDataDb().put(tx, key, value); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Retrieves message meta-data. - * - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " - + messageId + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - - try - { - OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Metadata not found for message with id " + messageId); - } - - StorableMessageMetaData mdd = messageBinding.entryToObject(value); - - return mdd; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); - } - } - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding contentTupleBinding = ContentBinding.getInstance(); - - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); - } - - try - { - - int written = 0; - OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - if (status == OperationStatus.SUCCESS) - { - byte[] dataAsBytes = contentTupleBinding.entryToObject(value); - int size = dataAsBytes.length; - if (offset > size) - { - throw new RuntimeException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); - - } - - written = size - offset; - if(written > dst.remaining()) - { - written = dst.remaining(); - } - - dst.put(dataAsBytes, offset, written); - } - return written; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - @Override - public boolean isPersistent() - { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - if(metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNewMessageId(), metaData); - } - } - - private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing configured object record: " + configuredObject); - } - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(configuredObject.getId(), key); - - DatabaseEntry value = new DatabaseEntry(); - ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); - - queueBinding.objectToEntry(configuredObject, value); - try - { - OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " to database: " - + status); - } - writeHierarchyRecords(txn, configuredObject); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject - + " to database: " + e.getMessage(), e); - } - } - - private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) - { - OperationStatus status; - HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance(); - DatabaseEntry hierarchyKey = new DatabaseEntry(); - DatabaseEntry hierarchyValue = new DatabaseEntry(); - - for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet()) - { - - hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey); - UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue); - status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: " - + status); - } - } - } - - private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException - { - UUID id = record.getId(); - Map<String, ConfiguredObjectRecord> parents = record.getParents(); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removing configured object: " + id); - } - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(id, key); - OperationStatus status = getConfiguredObjectsDb().delete(tx, key); - if(status == OperationStatus.SUCCESS) - { - for(String parentType : parents.keySet()) - { - DatabaseEntry hierarchyKey = new DatabaseEntry(); - HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance(); - keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey); - getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey); - } - } - return status; - } - - private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData> - { - - private final long _messageId; - private final boolean _isRecovered; - - private StorableMessageMetaData _metaData; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData) - { - this(messageId, metaData, false); - } - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) - { - _messageId = messageId; - _isRecovered = isRecovered; - - if(!_isRecovered) - { - _metaData = metaData; - } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - @Override - public StorableMessageMetaData getMetaData() - { - StorableMessageMetaData metaData = _metaDataRef.get(); - if(metaData == null) - { - checkMessageStoreOpen(); - - metaData = BDBMessageStore.this.getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - return metaData; - } - - @Override - public long getMessageNumber() - { - return _messageId; - } - - @Override - public void addContent(int offsetInMessage, java.nio.ByteBuffer src) - { - src = src.slice(); - - if(_data == null) - { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); - } - else - { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); - } - - } - - @Override - public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else - { - checkMessageStoreOpen(); - - return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else - { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; - } - } - - synchronized void store(com.sleepycat.je.Transaction txn) - { - if (!stored()) - { - try - { - _dataRef = new SoftReference<byte[]>(_data); - BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - BDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); - } - finally - { - _metaData = null; - _data = null; - } - } - } - - @Override - public synchronized StoreFuture flushToStore() - { - if(!stored()) - { - checkMessageStoreOpen(); - - com.sleepycat.je.Transaction txn; - try - { - txn = _environmentFacade.beginTransaction(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("failed to begin transaction", e); - } - store(txn); - _environmentFacade.commit(txn); - _committer.commit(txn, true); - - storedSizeChangeOccured(getMetaData().getContentSize()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void remove() - { - checkMessageStoreOpen(); - - int delta = getMetaData().getContentSize(); - BDBMessageStore.this.removeMessage(_messageId, false); - storedSizeChangeOccured(-delta); - } - - private boolean stored() - { - return _metaData == null || _isRecovered; - } - } - - private class BDBTransaction implements org.apache.qpid.server.store.Transaction - { - private com.sleepycat.je.Transaction _txn; - private int _storeSizeIncrease; - - private BDBTransaction() throws StoreException - { - _txn = _environmentFacade.beginTransaction(); - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException - { - checkMessageStoreOpen(); - - if(message.getStoredMessage() instanceof StoredBDBMessage) - { - final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); - } - - BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); - } - - @Override - public void commitTran() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.commitTranImpl(_txn, true); - BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); - } - - @Override - public StoreFuture commitTranAsync() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); - return BDBMessageStore.this.commitTranImpl(_txn, false); - } - - @Override - public void abortTran() throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.abortTran(_txn); - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) throws StoreException - { - checkMessageStoreOpen(); - - BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); - } - } - - private void storedSizeChangeOccured(final int delta) throws StoreException - { - try - { - storedSizeChange(delta); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Stored size change exception", e); - } - } - - private void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized (this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 2*delta; - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - _totalStoreSize = getSizeOnDisk(); - - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - _totalStoreSize = getSizeOnDisk(); - - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(); - - _totalStoreSize = getSizeOnDisk(); - - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - } - } - - private void checkConfigurationStoreOpen() - { - if (!_configurationStoreOpen.get()) - { - throw new IllegalStateException("Configuration store is not open"); - } - } - - private void checkMessageStoreOpen() - { - if (!_messageStoreOpen.get()) - { - throw new IllegalStateException("Message store is not open"); - } - } - - private void reduceSizeOnDisk() - { - _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); - boolean cleaned = false; - while (_environmentFacade.getEnvironment().cleanLog() > 0) - { - cleaned = true; - } - if (cleaned) - { - CheckpointConfig force = new CheckpointConfig(); - force.setForce(true); - _environmentFacade.getEnvironment().checkpoint(force); - } - - - _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); - } - - private long getSizeOnDisk() - { - return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize(); - } - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - - @Override - public void onDelete() - { - if (!_configurationStoreOpen.get() && !_messageStoreOpen.get()) - { - if (_storeLocation != null) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store " + _storeLocation); - } - - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - LOGGER.error("Cannot delete " + _storeLocation); - } - } - } - } - } - - private Database getConfiguredObjectsDb() - { - return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); - } - - private Database getConfiguredObjectHierarchyDb() - { - return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME); - } - - private Database getMessageContentDb() - { - return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); - } - - private Database getMessageMetaDataDb() - { - return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME); - } - - private Database getDeliveryDb() - { - return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME); - } - - private Database getXidDb() - { - return _environmentFacade.getOpenDatabase(XID_DB_NAME); - } - - class UpgradeTask implements EnvironmentFacadeTask - { - private final ConfiguredObject<?> _parent; - - public UpgradeTask(ConfiguredObject<?> parent) - { - _parent = parent; - } - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary(); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot upgrade store", e); - } - } - } - - class OpenDatabasesTask implements EnvironmentFacadeTask - { - private String[] _names; - - public OpenDatabasesTask(String[] names) - { - _names = names; - } - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - facade.openDatabases(dbConfig, _names); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot open databases", e); - } - } - - } - - class DiskSpaceTask implements EnvironmentFacadeTask - { - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize(); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot evaluate disk store size", e); - } - } - - } - - public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler - { - private long _maxId; - - @Override - public void execute(EnvironmentFacade facade) - { - visitMessagesInternal(this, facade); - _messageId.set(_maxId); - } - - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - long id = storedMessage.getMessageNumber(); - if (_maxId<id) - { - _maxId = id; - } - return true; - } - - } - - @Override - public void visitMessages(MessageHandler handler) throws StoreException - { - checkMessageStoreOpen(); - visitMessagesInternal(handler, _environmentFacade); - } - - private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) - { - Cursor cursor = null; - try - { - cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - if (!handler.handle(message)) - { - break; - } - } - } - catch (DatabaseException e) - { - throw environmentFacade.handleDatabaseException("Cannot recover messages", e); - } - finally - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch(DatabaseException e) - { - throw environmentFacade.handleDatabaseException("Cannot close cursor", e); - } - } - } - } - - @Override - public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - entries.add(entry); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - @Override - public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues())) - { - break; - } - } - - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e); - } - finally - { - closeCursorSafely(cursor); - } - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 2d783fa181..067372a228 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -42,13 +42,13 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public DurableConfigurationStore createDurableConfigurationStore() { - return new BDBMessageStore(); + return new BDBConfigurationStore(); } @Override public MessageStore createMessageStore() { - return new BDBMessageStore(); + return (new BDBConfigurationStore()).getMessageStore(); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index e80d60609f..1be7e97a47 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -27,9 +27,8 @@ import java.lang.reflect.InvocationTargetException; import org.apache.log4j.Logger; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -70,7 +69,7 @@ public class Upgrader if(versionDb.count() == 0L) { - int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion(); + int sourceVersion = isEmpty ? BDBConfigurationStore.VERSION: identifyOldStoreVersion(); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(sourceVersion, key); DatabaseEntry value = new DatabaseEntry(); @@ -86,11 +85,11 @@ public class Upgrader LOGGER.debug("Source message store version is " + version); } - if(version > BDBMessageStore.VERSION) + if(version > BDBConfigurationStore.VERSION) { throw new StoreException("Database version " + version + " is higher than the most recent known version: " - + BDBMessageStore.VERSION); + + BDBConfigurationStore.VERSION); } performUpgradeFromVersion(version, versionDb); } @@ -139,7 +138,7 @@ public class Upgrader void performUpgradeFromVersion(int sourceVersion, Database versionDb) throws StoreException { - while(sourceVersion != BDBMessageStore.VERSION) + while(sourceVersion != BDBConfigurationStore.VERSION) { upgrade(sourceVersion, ++sourceVersion); DatabaseEntry key = new DatabaseEntry(); @@ -191,7 +190,7 @@ public class Upgrader private int identifyOldStoreVersion() throws DatabaseException { - int version = BDBMessageStore.VERSION; + int version = BDBConfigurationStore.VERSION; for (String databaseName : _environment.getDatabaseNames()) { if (databaseName.contains("_v")) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index 974ff51a3b..af82d9fd2d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -42,7 +42,7 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm { public static final String TYPE = "BDB_HA"; - private final BDBMessageStore _messageStore; + private final BDBConfigurationStore _configurationStore; private MessageStoreLogSubject _messageStoreLogSubject; @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment") @@ -56,8 +56,8 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm { super(attributes, virtualHostNode); - _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); - _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); + _configurationStore = (BDBConfigurationStore) virtualHostNode.getConfigurationStore(); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _configurationStore.getMessageStore().getClass().getSimpleName()); } @Override @@ -68,13 +68,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm @Override public DurableConfigurationStore getDurableConfigurationStore() { - return _messageStore; + return _configurationStore; } @Override public MessageStore getMessageStore() { - return _messageStore; + return _configurationStore.getMessageStore(); } @Override @@ -176,7 +176,7 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() { - return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade(); + return (ReplicatedEnvironmentFacade) _configurationStore.getEnvironmentFacade(); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 91ac63ff46..e36def581b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -42,7 +42,6 @@ import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.HostPortPair; - import org.apache.log4j.Logger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; @@ -59,7 +58,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener; @@ -265,9 +264,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - public BDBMessageStore getConfigurationStore() + public BDBConfigurationStore getConfigurationStore() { - return (BDBMessageStore) super.getConfigurationStore(); + return (BDBConfigurationStore) super.getConfigurationStore(); } protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() @@ -278,7 +277,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override protected DurableConfigurationStore createConfigurationStore() { - return new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); + return new BDBConfigurationStore(new ReplicatedEnvironmentFacadeFactory()); } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 962e19f81c..0254cbc909 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -165,8 +165,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase DurableConfigurationStore store = node.getConfigurationStore(); assertNotNull(store); - BDBMessageStore bdbMessageStore = (BDBMessageStore) store; - ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment(); + BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store; + ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment(); ReplicationConfig replicationConfig = environment.getRepConfig(); assertEquals(nodeName, environment.getNodeName()); @@ -181,7 +181,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); assertNotNull("Virtual host child was not added", virtualHost); assertEquals("Unexpected virtual host name", groupName, virtualHost.getName()); - assertEquals("Unexpected virtual host store", store, virtualHost.getMessageStore()); + assertEquals("Unexpected virtual host store", bdbConfigurationStore.getMessageStore(), virtualHost.getMessageStore()); assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState()); node.stop(); @@ -212,8 +212,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase BDBHAVirtualHostNode<?> node = createAndStartHaVHN(attributes); - BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore(); - ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment(); + BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore(); + ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment(); assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority()); assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary()); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index e1678e6f65..8657a3a0b1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -28,6 +28,6 @@ public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurati @Override protected DurableConfigurationStore createConfigStore() throws Exception { - return new BDBMessageStore(); + return new BDBConfigurationStore(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index f2de01445d..2afdaa4dd5 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -74,7 +74,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB @Override protected MessageStore createStore() throws Exception { - MessageStore store = new BDBMessageStore(); + MessageStore store = (new BDBConfigurationStore()).getMessageStore(); return store; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 6fba1b215e..d623da846c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -50,6 +50,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.util.FileUtils; +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore.BDBMessageStore; /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against @@ -81,7 +82,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase */ public void testBDBMessagePersistence() throws Exception { - BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + MessageStore bdbStore = getStore(); // Create content ByteBuffers. // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. @@ -342,7 +343,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase */ public void testMessageCreationAndRemoval() throws Exception { - BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + BDBMessageStore bdbStore = (BDBMessageStore) getStore(); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -430,7 +431,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new BDBMessageStore(); + return new BDBConfigurationStore().getMessageStore(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index c407be50c6..5b869b67dc 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -26,7 +26,8 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.OperationStatus; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; + +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase @@ -94,7 +95,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase catch(ServerScopedRuntimeException ex) { assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " - + BDBMessageStore.VERSION, ex.getMessage()); + + BDBConfigurationStore.VERSION, ex.getMessage()); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index 4b9a8d19a8..54dd08fd98 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -24,7 +24,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import com.sleepycat.bind.tuple.IntegerBinding; @@ -95,7 +95,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase { assertEquals("Unexpected store version", -1, getStoreVersion(_environment)); _upgrader.upgradeIfNecessary(); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(_environment)); + assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(_environment)); assertContent(); } @@ -115,7 +115,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase List<String> expectedDatabases = new ArrayList<String>(); expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(emptyEnvironment)); + assertEquals("Unexpected store version", BDBConfigurationStore.VERSION, getStoreVersion(emptyEnvironment)); } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e0c1f77d2b..1db2c89d05 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -46,15 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; @@ -67,7 +58,17 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializerProvider; import org.codehaus.jackson.map.module.SimpleModule; -abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; + +abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; @@ -369,7 +370,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { if (_messageStoreOpen.compareAndSet(false, true)) @@ -952,7 +952,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override public void closeMessageStore() { if (_messageStoreOpen.compareAndSet(true, false)) @@ -978,7 +977,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract void doClose(); - @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { checkMessageStoreOpen(); @@ -1132,7 +1130,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract Connection getConnection() throws SQLException; - @Override public Transaction newTransaction() { checkMessageStoreOpen(); @@ -1665,7 +1662,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override public boolean isPersistent() { return true; @@ -1975,7 +1971,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void addEventListener(EventListener eventListener, Event... events) { _eventManager.addEventListener(eventListener, events); @@ -2250,7 +2245,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitMessages(MessageHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2301,7 +2295,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2346,7 +2339,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2447,8 +2439,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract void storedSizeChange(int storeSizeIncrease); - - @Override public void onDelete() { // TODO should probably check we are closed diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index e2c43f5012..e69de29bb2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -1,371 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; - -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore -{ - private final class MemoryMessageStoreTransaction implements Transaction - { - private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>(); - private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>(); - - private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>(); - private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); - - @Override - public StoreFuture commitTranAsync() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _localEnqueueMap.put(queue.getId(), messageIds); - } - messageIds.add(message.getMessageNumber()); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - Set<Long> messageIds = _localDequeueMap.get(queue.getId()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _localDequeueMap.put(queue.getId(), messageIds); - } - messageIds.add(message.getMessageNumber()); - } - - @Override - public void commitTran() - { - commitTransactionInternal(this); - _localEnqueueMap.clear(); - _localDequeueMap.clear(); - } - - @Override - public void abortTran() - { - _localEnqueueMap.clear(); - _localDequeueMap.clear(); - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); - } - } - - private final AtomicLong _messageId = new AtomicLong(1); - - private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>(); - - protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>(); - - private Object _transactionLock = new Object(); - private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>(); - private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>(); - - @SuppressWarnings("unchecked") - @Override - public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData) - { - long id = _messageId.getAndIncrement(); - - if(metaData.isPersistent()) - { - return new StoredMemoryMessage(id, metaData) - { - - @Override - public StoreFuture flushToStore() - { - _messages.putIfAbsent(getMessageNumber(), this) ; - return super.flushToStore(); - } - - @Override - public void remove() - { - _messages.remove(getMessageNumber()); - super.remove(); - } - - }; - } - else - { - return new StoredMemoryMessage(id, metaData); - } - } - - private void commitTransactionInternal(MemoryMessageStoreTransaction transaction) - { - synchronized (_transactionLock ) - { - for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet()) - { - Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds); - } - messageIds.addAll(loacalEnqueuedEntry.getValue()); - } - - for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet()) - { - Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey()); - if (messageIds != null) - { - messageIds.removeAll(loacalDequeueEntry.getValue()); - if (messageIds.isEmpty()) - { - _messageInstances.remove(loacalDequeueEntry.getKey()); - } - } - } - - for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet()) - { - _distributedTransactions.put(entry.getKey(), entry.getValue()); - } - - for (Xid removed : transaction._localDistributedTransactionsRemoves) - { - _distributedTransactions.remove(removed); - } - - } - - - } - - @Override - public Transaction newTransaction() - { - return new MemoryMessageStoreTransaction(); - } - - @Override - public boolean isPersistent() - { - return false; - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - } - - @Override - public void create(ConfiguredObjectRecord record) - { - if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null) - { - throw new StoreException("Record with id " + record.getId() + " is already present"); - } - } - - @Override - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) - { - for (ConfiguredObjectRecord record : records) - { - ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record); - if (previousValue == null && !createIfNecessary) - { - throw new StoreException("Record with id " + record.getId() + " does not exist"); - } - } - } - - @Override - public UUID[] remove(final ConfiguredObjectRecord... objects) - { - List<UUID> removed = new ArrayList<UUID>(); - for (ConfiguredObjectRecord record : objects) - { - if (_configuredObjectRecords.remove(record.getId()) != null) - { - removed.add(record.getId()); - } - } - return removed.toArray(new UUID[removed.size()]); - } - - @Override - public void closeConfigurationStore() - { - _configuredObjectRecords.clear(); - } - - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) - { - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException - { - handler.begin(); - for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) - { - if (!handler.handle(record)) - { - break; - } - } - handler.end(); - } - - @Override - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - } - - @Override - public void closeMessageStore() - { - _messages.clear(); - synchronized (_transactionLock) - { - _messageInstances.clear(); - _distributedTransactions.clear(); - } - } - - @Override - public String getStoreLocation() - { - return null; - } - - @Override - public void onDelete() - { - } - - @Override - public void visitMessages(MessageHandler handler) throws StoreException - { - for (StoredMemoryMessage message : _messages.values()) - { - if(!handler.handle(message)) - { - break; - } - } - } - - @Override - public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException - { - synchronized (_transactionLock) - { - for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) - { - UUID resourceId = enqueuedEntry.getKey(); - for (Long messageId : enqueuedEntry.getValue()) - { - if (!handler.handle(resourceId, messageId)) - { - return; - } - } - } - } - } - - @Override - public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException - { - synchronized (_transactionLock) - { - for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) - { - Xid xid = entry.getKey(); - DistributedTransactionRecords records = entry.getValue(); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues())) - { - break; - } - } - } - } - - private static final class DistributedTransactionRecords - { - private Record[] _enqueues; - private Record[] _dequeues; - - public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues) - { - super(); - _enqueues = enqueues; - _dequeues = dequeues; - } - - public Record[] getEnqueues() - { - return _enqueues; - } - - public Record[] getDequeues() - { - return _dequeues; - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java new file mode 100644 index 0000000000..267e1d9cb3 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +abstract class AbstractMemoryStore implements DurableConfigurationStore, MessageStoreProvider +{ + private final MessageStore _messageStore = new MemoryMessageStore(); + + + private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>(); + + + + @Override + public void create(ConfiguredObjectRecord record) + { + if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null) + { + throw new StoreException("Record with id " + record.getId() + " is already present"); + } + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) + { + for (ConfiguredObjectRecord record : records) + { + ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record); + if (previousValue == null && !createIfNecessary) + { + throw new StoreException("Record with id " + record.getId() + " does not exist"); + } + } + } + + @Override + public UUID[] remove(final ConfiguredObjectRecord... objects) + { + List<UUID> removed = new ArrayList<UUID>(); + for (ConfiguredObjectRecord record : objects) + { + if (_configuredObjectRecords.remove(record.getId()) != null) + { + removed.add(record.getId()); + } + } + return removed.toArray(new UUID[removed.size()]); + } + + @Override + public void closeConfigurationStore() + { + _configuredObjectRecords.clear(); + } + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException + { + handler.begin(); + for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) + { + if (!handler.handle(record)) + { + break; + } + } + handler.end(); + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java new file mode 100644 index 0000000000..44d640ca86 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryMessageStore implements MessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + + private final ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>(); + private final Object _transactionLock = new Object(); + private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>(); + private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>(); + + + private final class MemoryMessageStoreTransaction implements Transaction + { + private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>(); + private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>(); + + private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>(); + private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); + + @Override + public StoreFuture commitTranAsync() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) + { + Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _localEnqueueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + { + Set<Long> messageIds = _localDequeueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _localDequeueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); + } + + @Override + public void commitTran() + { + commitTransactionInternal(this); + _localEnqueueMap.clear(); + _localDequeueMap.clear(); + } + + @Override + public void abortTran() + { + _localEnqueueMap.clear(); + _localDequeueMap.clear(); + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); + } + } + + private static final class DistributedTransactionRecords + { + private Transaction.Record[] _enqueues; + private Transaction.Record[] _dequeues; + + public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + { + super(); + _enqueues = enqueues; + _dequeues = dequeues; + } + + public Transaction.Record[] getEnqueues() + { + return _enqueues; + } + + public Transaction.Record[] getDequeues() + { + return _dequeues; + } + } + + private void commitTransactionInternal(MemoryMessageStoreTransaction transaction) + { + synchronized (_transactionLock ) + { + for (Map.Entry<UUID, Set<Long>> localEnqueuedEntry : transaction._localEnqueueMap.entrySet()) + { + Set<Long> messageIds = _messageInstances.get(localEnqueuedEntry.getKey()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _messageInstances.put(localEnqueuedEntry.getKey(), messageIds); + } + messageIds.addAll(localEnqueuedEntry.getValue()); + } + + for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet()) + { + Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey()); + if (messageIds != null) + { + messageIds.removeAll(loacalDequeueEntry.getValue()); + if (messageIds.isEmpty()) + { + _messageInstances.remove(loacalDequeueEntry.getKey()); + } + } + } + + for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet()) + { + _distributedTransactions.put(entry.getKey(), entry.getValue()); + } + + for (Xid removed : transaction._localDistributedTransactionsRemoves) + { + _distributedTransactions.remove(removed); + } + + } + + + } + + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + long id = _messageId.getAndIncrement(); + + if(metaData.isPersistent()) + { + return new StoredMemoryMessage<T>(id, metaData) + { + + @Override + public StoreFuture flushToStore() + { + _messages.putIfAbsent(getMessageNumber(), this) ; + return super.flushToStore(); + } + + @Override + public void remove() + { + _messages.remove(getMessageNumber()); + super.remove(); + } + + }; + } + else + { + return new StoredMemoryMessage<T>(id, metaData); + } + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public Transaction newTransaction() + { + return new MemoryMessageStoreTransaction(); + } + + @Override + public void closeMessageStore() + { + _messages.clear(); + synchronized (_transactionLock) + { + _messageInstances.clear(); + _distributedTransactions.clear(); + } + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + } + + @Override + public String getStoreLocation() + { + return null; + } + + @Override + public void onDelete() + { + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + for (StoredMemoryMessage message : _messages.values()) + { + if(!handler.handle(message)) + { + break; + } + } + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) + { + UUID resourceId = enqueuedEntry.getKey(); + for (Long messageId : enqueuedEntry.getValue()) + { + if (!handler.handle(resourceId, messageId)) + { + return; + } + } + } + } + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) + { + Xid xid = entry.getKey(); + DistributedTransactionRecords records = entry.getValue(); + if (!handler.handle(xid.getFormat(), + xid.getGlobalId(), + xid.getBranchId(), + records.getEnqueues(), + records.getDequeues())) + { + break; + } + } + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java new file mode 100644 index 0000000000..94d58013d2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface MessageStoreProvider +{ + MessageStore getMessageStore(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e7302270bb..f4e1376980 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -23,13 +23,13 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; -public class StoredMemoryMessage implements StoredMessage +public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T> { private final long _messageNumber; private ByteBuffer _content; - private final StorableMessageMetaData _metaData; + private final T _metaData; - public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData) + public StoredMemoryMessage(long messageNumber, T metaData) { _messageNumber = messageNumber; _metaData = metaData; @@ -128,7 +128,7 @@ public class StoredMemoryMessage implements StoredMessage } - public StorableMessageMetaData getMetaData() + public T getMetaData() { return _metaData; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index d27cd1c13e..9e10d5e424 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -77,6 +77,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; @@ -196,13 +197,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte DurableConfigurationStore durableConfigurationStore = _virtualHostNode.getConfigurationStore(); + // TODO attribute messageStoreProvider is to be removed boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider(); if (nodeIsMessageStoreProvider) { - if (!(durableConfigurationStore instanceof MessageStore)) + if (!(durableConfigurationStore instanceof MessageStoreProvider)) { throw new IllegalConfigurationException("Virtual host node " + _virtualHostNode.getName() - + " is configured as a provider of message store but the MessageStore interface is not implemented on a configuration store of type " + + " is configured as a provider of message store but the MessageStoreProvider interface is not implemented on a configuration store of type " + durableConfigurationStore.getClass().getName()); } } @@ -215,7 +217,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte + ". You can either configure the message store setting on the host or " + (durableConfigurationStore instanceof MessageStore ? " configure VirtualHostNode " + _virtualHostNode.getName() + " as a provider of message store" : - " change the node type to one having configuration store implementing the MessageStore inteface") ); + " change the node type to one having configuration store implementing the MessageStore interface") ); } String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE); if (storeType == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 62e545659b..b9356b368b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; @ManagedObject( category = false, type = "STANDARD") public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost> @@ -96,7 +97,7 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class); if (virtualHostNode.isMessageStoreProvider()) { - _messageStore = (MessageStore)virtualHostNode.getConfigurationStore(); + _messageStore = ((MessageStoreProvider)virtualHostNode.getConfigurationStore()).getMessageStore(); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index f52558e298..c87d24f9c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X> { @@ -198,9 +199,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< deleteVirtualHostIfExists(); close(); deleted(); - if (getConfigurationStore() instanceof MessageStore) + if (getConfigurationStore() instanceof MessageStoreProvider) { - ((MessageStore)getConfigurationStore()).onDelete(); + ((MessageStoreProvider)getConfigurationStore()).getMessageStore().onDelete(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java new file mode 100644 index 0000000000..4cb85dafd4 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.MessageStore; + +@ManagedObject( category = false ) +public interface MessageStoreProvidingVirtualHostNode<X extends MessageStoreProvidingVirtualHostNode<X>> + extends VirtualHostNode<X> +{ + MessageStore getProvidedMessageStore(); +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java new file mode 100644 index 0000000000..8fd3cbb1fe --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; + +public class MemoryMessageStoreTest extends MessageStoreTestCase +{ + + @Override + protected Map<String, Object> getStoreSettings() throws Exception + { + return Collections.<String, Object>emptyMap(); + } + + @Override + protected MessageStore createMessageStore() + { + return new MemoryMessageStore(); + } + + @Override + protected void reopenStore() throws Exception + { + // cannot re-open memory message store as it is not persistent + } + +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index bfa4e1d52e..12b21fa964 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.server.store.handler.MessageHandler; -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -public class TestMemoryMessageStore extends AbstractMemoryMessageStore +/** + * A simple message store that stores the messages in a thread-safe structure in memory. + */ +public class TestMemoryMessageStore extends MemoryMessageStore { public static final String TYPE = "TestMemory"; @@ -34,15 +36,14 @@ public class TestMemoryMessageStore extends AbstractMemoryMessageStore { final AtomicInteger counter = new AtomicInteger(); visitMessages(new MessageHandler() - { - - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - counter.incrementAndGet(); - return true; - } - }); + { + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + counter.incrementAndGet(); + return true; + } + }); return counter.get(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java index d7c11ea226..df17884495 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java @@ -49,7 +49,9 @@ public class TestMemoryMessageStoreFactory implements MessageStoreFactory, Durab @Override public DurableConfigurationStore createDurableConfigurationStore() { - return new TestMemoryMessageStore(); + return new AbstractMemoryStore() + { + }; } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index e1e37ad3bd..47456f2675 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -21,6 +21,10 @@ package org.apache.qpid.server.protocol.v0_8; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Set; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -39,10 +43,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Set; - /** * Tests that acknowledgements are handled correctly. */ @@ -90,8 +90,6 @@ public class AckTest extends QpidTestCase { for (int i = 1; i <= count; i++) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. MessagePublishInfo publishBody = new MessagePublishInfo() { @@ -130,13 +128,15 @@ public class AckTest extends QpidTestCase b.setDeliveryMode((byte) 2); } - // we increment the reference here since we are not delivering the messaging to any queues, which is where - // the reference is normally incremented. The test is easier to construct if we have direct access to the - // subscription + // The test is easier to construct if we have direct access to the subscription ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); - MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); - final StoredMessage storedMessage = _messageStore.addMessage(mmd); + + final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); + + final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd); + + final StoredMessage storedMessage = result; final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); txn.enqueue(_queue, message, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index e9c37e7b42..69b3069ddb 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -81,7 +81,8 @@ public class ReferenceCountingTest extends QpidTestCase - MessageMetaData mmd = new MessageMetaData(info, chb); + final MessageMetaData mmd = new MessageMetaData(info, chb); + StoredMessage storedMessage = _store.addMessage(mmd); storedMessage.flushToStore(); @@ -139,7 +140,8 @@ public class ReferenceCountingTest extends QpidTestCase final ContentHeaderBody chb = createPersistentContentHeader(); - MessageMetaData mmd = new MessageMetaData(info, chb); + final MessageMetaData mmd = new MessageMetaData(info, chb); + StoredMessage storedMessage = _store.addMessage(mmd); storedMessage.flushToStore(); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index d682076350..38b4c66ebe 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -35,11 +35,21 @@ import java.util.List; import java.util.Map; import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.util.FileUtils; /** @@ -47,7 +57,8 @@ import org.apache.qpid.util.FileUtils; * mechanism. * */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider, + DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -72,6 +83,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa private String _storeLocation; private Class<Driver> _driverClass; + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); + public DerbyMessageStore() { } @@ -239,8 +252,6 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } } - - @Override public String getStoreLocation() { return _storeLocation; @@ -446,4 +457,81 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa { return DriverManager.getConnection(_connectionURL); } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper implements MessageStore + { + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + return DerbyMessageStore.this.addMessage(metaData); + } + + @Override + public boolean isPersistent() + { + return DerbyMessageStore.this.isPersistent(); + } + + @Override + public Transaction newTransaction() + { + return DerbyMessageStore.this.newTransaction(); + } + + @Override + public void closeMessageStore() + { + DerbyMessageStore.this.closeMessageStore(); + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + DerbyMessageStore.this.addEventListener(eventListener, events); + } + + @Override + public String getStoreLocation() + { + return DerbyMessageStore.this.getStoreLocation(); + } + + @Override + public void onDelete() + { + DerbyMessageStore.this.onDelete(); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + DerbyMessageStore.this.visitMessages(handler); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + DerbyMessageStore.this.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + DerbyMessageStore.this.visitDistributedTransactions(handler); + } + } + } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index 13c897135d..9bc3780a71 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -48,7 +48,7 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableCon @Override public MessageStore createMessageStore() { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index ba7ae26292..1d35b9ef83 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes @Override protected MessageStore createStore() throws Exception { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 9a2d945494..4594b7f223 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index d70f2a3d78..ddafa83bb3 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -31,12 +31,22 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.util.MapValueConverter; /** @@ -44,7 +54,7 @@ import org.apache.qpid.server.util.MapValueConverter; * mechanism. * */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore +public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider { private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); @@ -60,6 +70,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag protected String _connectionURL; private ConnectionProvider _connectionProvider; + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); private static class JDBCDetails { @@ -331,7 +342,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag { } - @Override public String getStoreLocation() { return _connectionURL; @@ -428,4 +438,80 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } } + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper implements MessageStore + { + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + return JDBCMessageStore.this.addMessage(metaData); + } + + @Override + public boolean isPersistent() + { + return JDBCMessageStore.this.isPersistent(); + } + + @Override + public Transaction newTransaction() + { + return JDBCMessageStore.this.newTransaction(); + } + + @Override + public void closeMessageStore() + { + JDBCMessageStore.this.closeMessageStore(); + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + JDBCMessageStore.this.addEventListener(eventListener, events); + } + + @Override + public String getStoreLocation() + { + return JDBCMessageStore.this.getStoreLocation(); + } + + @Override + public void onDelete() + { + JDBCMessageStore.this.onDelete(); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + JDBCMessageStore.this.visitMessages(handler); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + JDBCMessageStore.this.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + JDBCMessageStore.this.visitDistributedTransactions(handler); + } + } + } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java index e1db859a98..ab7ac6c671 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -42,7 +42,7 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConf @Override public MessageStore createMessageStore() { - return new JDBCMessageStore(); + return (new JDBCMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 2322fa7102..1f03b7f75f 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new JDBCMessageStore(); + return (new JDBCMessageStore()).getMessageStore(); } private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java new file mode 100644 index 0000000000..0b0e6705d5 --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryConfigurationStore extends AbstractMemoryStore +{ + public static final String TYPE = "Memory"; + +} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index c8dd2e6e61..e69de29bb2 100644 --- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - - -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -public class MemoryMessageStore extends AbstractMemoryMessageStore -{ - public static final String TYPE = "Memory"; - -} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java index d5d5969a47..8148ff9371 100644 --- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -33,7 +33,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo @Override public String getType() { - return MemoryMessageStore.TYPE; + return MemoryConfigurationStore.TYPE; } @Override @@ -50,7 +50,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo @Override public DurableConfigurationStore createDurableConfigurationStore() { - return new MemoryMessageStore(); + return new MemoryConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java index 8fd3cbb1fe..e69de29bb2 100644 --- a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java +++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Collections; -import java.util.Map; - -public class MemoryMessageStoreTest extends MessageStoreTestCase -{ - - @Override - protected Map<String, Object> getStoreSettings() throws Exception - { - return Collections.<String, Object>emptyMap(); - } - - @Override - protected MessageStore createMessageStore() - { - return new MemoryMessageStore(); - } - - @Override - protected void reopenStore() throws Exception - { - // cannot re-open memory message store as it is not persistent - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 0993783e54..b6dd1b1b71 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -27,12 +27,12 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.ConfiguredObject; -public class QuotaMessageStore extends AbstractMemoryMessageStore +public class QuotaMessageStore extends MemoryMessageStore { public static final String TYPE = "QuotaMessageStore"; private final AtomicLong _messageId = new AtomicLong(1); - private long _totalStoreSize;; + private long _totalStoreSize; private boolean _limitBusted; private long _persistentSizeLowThreshold; private long _persistentSizeHighThreshold; @@ -66,12 +66,11 @@ public class QuotaMessageStore extends AbstractMemoryMessageStore } - @SuppressWarnings("unchecked") @Override - public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData) + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { final long id = _messageId.getAndIncrement(); - return new StoredMemoryMessage(id, metaData); + return new StoredMemoryMessage<T>(id, metaData); } @Override diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index ba00c430ed..0a686926e3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -146,7 +146,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); } - final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE); + final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryConfigurationStore.TYPE : messageStoreSettings.get(REAL_STORE); final String realStore = (String) realStoreAttr; _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 1aee32db1b..e9f33a1658 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -67,7 +67,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MemoryConfigurationStore; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.SystemUtils; @@ -854,7 +854,7 @@ public class QpidBrokerTestCase extends QpidTestCase { storeDir = ":memory:"; } - else if (!MemoryMessageStore.TYPE.equals(storeType)) + else if (!MemoryConfigurationStore.TYPE.equals(storeType)) { storeDir = "${QPID_WORK}" + File.separator + virtualHostNodeName + File.separator + brokerPort; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java index e4d4e5b24e..0dcfbf2be3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java @@ -32,14 +32,13 @@ import java.util.Map; import java.util.UUID; import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.JsonFileConfigStore; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MemoryConfigurationStore; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.Strings; @@ -84,7 +83,7 @@ public class TestUtils config.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, false); // If using MMS, switch to split store with JSON config store. - if (MemoryMessageStore.TYPE.equals(configStoreType)) + if (MemoryConfigurationStore.TYPE.equals(configStoreType)) { configStoreType = JsonFileConfigStore.TYPE; } |
