diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/legacystore/MessageStoreImpl.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/legacystore/MessageStoreImpl.cpp')
-rw-r--r-- | cpp/src/qpid/legacystore/MessageStoreImpl.cpp | 1732 |
1 files changed, 1732 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp new file mode 100644 index 0000000000..69e9f48a17 --- /dev/null +++ b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -0,0 +1,1732 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/legacystore/MessageStoreImpl.h" + +#include "qpid/legacystore/BindingDbt.h" +#include "qpid/legacystore/BufferValue.h" +#include "qpid/legacystore/IdDbt.h" +#include "qpid/legacystore/jrnl/txn_map.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/log/Statement.h" +#include "qmf/org/apache/qpid/legacystore/Package.h" +#include "qpid/legacystore/StoreException.h" +#include <dirent.h> +#include <db.h> + +#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec +#define AIO_SLEEP_TIME_US 10 // 0.01 ms + +namespace _qmf = qmf::org::apache::qpid::legacystore; + +namespace mrg { +namespace msgstore { + + +const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name +// FIXME aconway 2010-03-09: was 10 +qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms +qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s +qpid::sys::Mutex TxnCtxt::globalSerialiser; + +MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid, + const bool _deq_flag, + const bool _commit_flag, + const bool _tpc_flag) : + rid(_rid), + deq_flag(_deq_flag), + commit_flag(_commit_flag), + tpc_flag(_tpc_flag) +{} + +MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) : + numJrnlFiles(0), + autoJrnlExpand(false), + autoJrnlExpandMaxFiles(0), + jrnlFsizeSblks(0), + truncateFlag(false), + wCachePgSizeSblks(0), + wCacheNumPages(0), + tplNumJrnlFiles(0), + tplJrnlFsizeSblks(0), + tplWCachePgSizeSblks(0), + tplWCacheNumPages(0), + highestRid(0), + isInit(false), + envPath(envpath), + broker(broker_), + mgmtObject(), + agent(0) +{} + +u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName) +{ + u_int16_t p = param; + if (p < JRNL_MIN_NUM_FILES) { + p = JRNL_MIN_NUM_FILES; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value."); + } else if (p > JRNL_MAX_NUM_FILES) { + p = JRNL_MAX_NUM_FILES; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value."); + } + return p; +} + +u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks) +{ + u_int32_t p = param; + u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE; + u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE; + if (p < min) { + p = min; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value."); + } else if (p > max) { + p = max; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value."); + } + if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) { + std::ostringstream oss; + oss << "Cannot create store with file size less than write page cache size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]"; + THROW_STORE_EXCEPTION(oss.str()); + } + return p; +} + +u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs) +{ + u_int32_t p = param; + switch (p) + { + case 1: + case 2: + case 4: + case 8: + case 16: + case 32: + case 64: + case 128: + if (jrnlFsizePgs == 1) { + p = 64; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); + } + break; + default: + if (p == 0) { + // For zero value, use default + p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); + } else { + // For any positive value, use closest value + if (p < 6) p = 4; + else if (p < 12) p = 8; + else if (p < 24) p = 16; + else if (p < 48) p = 32; + else if (p < 96) p = 64; + else p = 128; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); + } + } + return p; +} + +u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib) +{ + u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks + u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). + switch (wrPageSizeKib) + { + case 1: + case 2: + case 4: + // 256 KiB total cache + return defTotWCacheSize / wrPageSizeSblks / 4; + case 8: + case 16: + // 512 KiB total cache + return defTotWCacheSize / wrPageSizeSblks / 2; + default: // 32, 64, 128 + // 1 MiB total cache + return defTotWCacheSize / wrPageSizeSblks; + } +} + +void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, + bool& autoJrnlExpand, + u_int16_t& autoJrnlExpandMaxFiles, + const std::string& autoJrnlExpandMaxFilesParamName, + const u_int16_t numJrnlFiles, + const std::string& numJrnlFilesParamName) +{ + if (!opts->autoJrnlExpand) { + // auto-expand disabled + autoJrnlExpand = false; + autoJrnlExpandMaxFiles = 0; + return; + } + u_int16_t p = opts->autoJrnlExpandMaxFiles; + if (numJrnlFiles == JRNL_MAX_NUM_FILES) { + // num-jfiles at max; disable auto-expand + autoJrnlExpand = false; + autoJrnlExpandMaxFiles = 0; + QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter " + << numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand."); + return; + } + if (p > JRNL_MAX_NUM_FILES) { + // auto-expand-max-jfiles higher than max allowable, adjust + autoJrnlExpand = true; + autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES; + QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum (" + << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value."); + return; + } + if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) { + // num-jfiles is different from the default AND max-auto-expand-jfiles is still at default + // change value of max-auto-expand-jfiles + autoJrnlExpand = true; + if (2 * numJrnlFiles <= JRNL_MAX_NUM_FILES) { + autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES; + QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default value (" + << defAutoJrnlExpandMaxFiles << ") to twice that of parameter " << numJrnlFilesParamName << " (" << autoJrnlExpandMaxFiles << ")."); + } else { + autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES; + QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default to maximum allowable value (" + << JRNL_MAX_NUM_FILES << ") because of the value of " << numJrnlFilesParamName << " (" << numJrnlFiles << ")."); + } + return; + } + // No adjustments req'd, set values + autoJrnlExpand = true; + autoJrnlExpandMaxFiles = p; +} + +void MessageStoreImpl::initManagement () +{ + if (broker != 0) { + agent = broker->getManagementAgent(); + if (agent != 0) { + _qmf::Package packageInitializer(agent); + mgmtObject = _qmf::Store::shared_ptr ( + new _qmf::Store(agent, this, broker)); + + mgmtObject->set_location(storeDir); + mgmtObject->set_defaultInitialFileCount(numJrnlFiles); + mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE); + mgmtObject->set_tplIsInitialized(false); + mgmtObject->set_tplDirectory(getTplBaseDir()); + mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + mgmtObject->set_tplWritePages(tplWCacheNumPages); + mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles); + mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles); + + agent->addObject(mgmtObject, 0, true); + + // Initialize all existing queues (ie those recovered before management was initialized) + for (JournalListMapItr i=journalList.begin(); i!=journalList.end(); i++) { + i->second->initManagement(agent); + } + } + } +} + +bool MessageStoreImpl::init(const qpid::Options* options) +{ + // Extract and check options + const StoreOptions* opts = static_cast<const StoreOptions*>(options); + u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles"); + u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs"); + u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size", jrnlFsizePgs); + u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles"); + u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs"); + u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size", tplJrnlFSizePgs); + bool autoJrnlExpand; + u_int16_t autoJrnlExpandMaxFiles; + chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles"); + + // Pass option values to init(...) + return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, opts->truncateFlag, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib, autoJrnlExpand, autoJrnlExpandMaxFiles); +} + +// These params, taken from options, are assumed to be correct and verified +bool MessageStoreImpl::init(const std::string& dir, + u_int16_t jfiles, + u_int32_t jfileSizePgs, + const bool truncateFlag, + u_int32_t wCachePageSizeKib, + u_int16_t tplJfiles, + u_int32_t tplJfileSizePgs, + u_int32_t tplWCachePageSizeKib, + bool autoJExpand, + u_int16_t autoJExpandMaxFiles) +{ + if (isInit) return true; + + // Set geometry members (converting to correct units where req'd) + numJrnlFiles = jfiles; + jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; + wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks + wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib); + tplNumJrnlFiles = tplJfiles; + tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; + tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks + tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib); + autoJrnlExpand = autoJExpand; + autoJrnlExpandMaxFiles = autoJExpandMaxFiles; + if (dir.size()>0) storeDir = dir; + + if (truncateFlag) + truncateInit(false); + else + init(); + + QPID_LOG(notice, "Store module initialized; store-dir=" << dir); + QPID_LOG(info, "> Default files per journal: " << jfiles); +// TODO: Uncomment these lines when auto-expand is enabled. +// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); +// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); + QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); + QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); + QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); + QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); + QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); + QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); + QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); + + return isInit; +} + +void MessageStoreImpl::init() +{ + const int retryMax = 3; + int bdbRetryCnt = 0; + do { + if (bdbRetryCnt++ > 0) + { + closeDbs(); + ::usleep(1000000); // 1 sec delay + QPID_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")..."); + } + + try { + journal::jdir::create_dir(getBdbBaseDir()); + + dbenv.reset(new DbEnv(0)); + dbenv->set_errpfx("msgstore"); + dbenv->set_lg_regionmax(256000); // default = 65000 + dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0); + + // Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used + // against the database environment. Recover can only be performed if no databases have been created + // against the environment at the time of recovery, as recovery invalidates the environment. + queueDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(queueDb); + configDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(configDb); + exchangeDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(exchangeDb); + mappingDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(mappingDb); + bindingDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(bindingDb); + generalDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(generalDb); + + TxnCtxt txn; + txn.begin(dbenv.get(), false); + try { + open(queueDb, txn.get(), "queues.db", false); + open(configDb, txn.get(), "config.db", false); + open(exchangeDb, txn.get(), "exchanges.db", false); + open(mappingDb, txn.get(), "mappings.db", true); + open(bindingDb, txn.get(), "bindings.db", true); + open(generalDb, txn.get(), "general.db", false); + txn.commit(); + } catch (...) { txn.abort(); throw; } + // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. + // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the + // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + isInit = true; + } catch (const DbException& e) { + if (e.get_errno() == DB_VERSION_MISMATCH) + { + QPID_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what()); + THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of db4 does not match that which created the store database. " + "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using " + "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e); + } + QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what()); + if (bdbRetryCnt >= retryMax) + THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); + } catch (const StoreException&) { + throw; + } catch (const journal::jexception& e) { + QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); + THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); + } catch (...) { + QPID_LOG(error, "Unknown exception occurred while initializing store."); + throw; + } + } while (!isInit); +} + +void MessageStoreImpl::finalize() +{ + if (tplStorePtr.get() && tplStorePtr->is_ready()) tplStorePtr->stop(true); + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++) + { + JournalImpl* jQueue = i->second; + jQueue->resetDeleteCallback(); + if (jQueue->is_ready()) jQueue->stop(true); + } + } + + if (mgmtObject.get() != 0) { + mgmtObject->resourceDestroy(); + mgmtObject.reset(); + } +} + +void MessageStoreImpl::truncateInit(const bool saveStoreContent) +{ + if (isInit) { + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + if (journalList.size()) { // check no queues exist + std::ostringstream oss; + oss << "truncateInit() called with " << journalList.size() << " queues still in existence"; + THROW_STORE_EXCEPTION(oss.str()); + } + } + closeDbs(); + dbs.clear(); + if (tplStorePtr->is_ready()) tplStorePtr->stop(true); + dbenv->close(0); + isInit = false; + } + std::ostringstream oss; + oss << storeDir << "/" << storeTopLevelDir; + if (saveStoreContent) { + std::string dir = mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster"); + QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); + } else { + mrg::journal::jdir::delete_dir(oss.str().c_str()); + QPID_LOG(notice, "Store directory " << oss.str() << " was truncated."); + } + init(); +} + +void MessageStoreImpl::chkTplStoreInit() +{ + // Prevent multiple threads from late-initializing the TPL + qpid::sys::Mutex::ScopedLock sl(tplInitLock); + if (!tplStorePtr->is_ready()) { + journal::jdir::create_dir(getTplBaseDir()); + tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); + if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); + } +} + +void MessageStoreImpl::open(db_ptr db, + DbTxn* txn, + const char* file, + bool dupKey) +{ + if(dupKey) db->set_flags(DB_DUPSORT); + db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); +} + +void MessageStoreImpl::closeDbs() +{ + for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) { + (*i)->close(0); + } + dbs.clear(); +} + +MessageStoreImpl::~MessageStoreImpl() +{ + finalize(); + try { + closeDbs(); + } catch (const DbException& e) { + QPID_LOG(error, "Error closing BDB databases: " << e.what()); + } catch (const journal::jexception& e) { + QPID_LOG(error, "Error: " << e.what()); + } catch (const std::exception& e) { + QPID_LOG(error, "Error: " << e.what()); + } catch (...) { + QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()"); + } + + if (mgmtObject.get() != 0) { + mgmtObject->resourceDestroy(); + mgmtObject.reset(); + } +} + +void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, + const qpid::framing::FieldTable& args) +{ + checkInit(); + if (queue.getPersistenceId()) { + THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); + } + JournalImpl* jQueue = 0; + qpid::framing::FieldTable::ValuePtr value; + + u_int16_t localFileCount = numJrnlFiles; + bool localAutoExpandFlag = autoJrnlExpand; + u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles; + u_int32_t localFileSizeSblks = jrnlFsizeSblks; + + value = args.get("qpid.file_count"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) + localFileCount = chkJrnlNumFilesParam((u_int16_t) value->get<int>(), "qpid.file_count"); + + value = args.get("qpid.file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) + localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; + + if (queue.getName().size() == 0) + { + QPID_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + return; + } + + jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"), + defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList[queue.getName()]=jQueue; + } + + value = args.get("qpid.auto_expand"); + if (value.get() != 0 && !value->empty() && value->convertsTo<bool>()) + localAutoExpandFlag = (bool) value->get<bool>(); + + value = args.get("qpid.auto_expand_max_jfiles"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) + localAutoExpandMaxFileCount = (u_int16_t) value->get<int>(); + + queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); + try { + // init will create the deque's for the init... + jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks); + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what()); + } + try { + if (!create(queueDb, queueIdSequence, queue)) { + THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName()); + } + } catch (const DbException& e) { + THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e); + } +} + +void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue) +{ + checkInit(); + destroy(queueDb, queue); + deleteBindingsForQueue(queue); + qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore(); + if (eqs) { + JournalImpl* jQueue = static_cast<JournalImpl*>(eqs); + jQueue->delete_jrnl_files(); + queue.setExternalQueueStore(0); // will delete the journal if exists + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList.erase(queue.getName()); + } + } +} + +void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange, + const qpid::framing::FieldTable& /*args*/) +{ + checkInit(); + if (exchange.getPersistenceId()) { + THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); + } + try { + if (!create(exchangeDb, exchangeIdSequence, exchange)) { + THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName()); + } + } catch (const DbException& e) { + THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e); + } +} + +void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange) +{ + checkInit(); + destroy(exchangeDb, exchange); + //need to also delete bindings + IdDbt key(exchange.getPersistenceId()); + bindingDb->del(0, &key, DB_AUTO_COMMIT); +} + +void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general) +{ + checkInit(); + if (general.getPersistenceId()) { + THROW_STORE_EXCEPTION("General configuration item already created"); + } + try { + if (!create(generalDb, generalIdSequence, general)) { + THROW_STORE_EXCEPTION("General configuration already exists"); + } + } catch (const DbException& e) { + THROW_STORE_EXCEPTION_2("Error creating general configuration", e); + } +} + +void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general) +{ + checkInit(); + destroy(generalDb, general); +} + +bool MessageStoreImpl::create(db_ptr db, + IdSequence& seq, + const qpid::broker::Persistable& p) +{ + u_int64_t id (seq.next()); + Dbt key(&id, sizeof(id)); + BufferValue value (p); + + int status; + TxnCtxt txn; + txn.begin(dbenv.get(), true); + try { + status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE); + txn.commit(); + } catch (...) { + txn.abort(); + throw; + } + if (status == DB_KEYEXIST) { + return false; + } else { + p.setPersistenceId(id); + return true; + } +} + +void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p) +{ + qpid::sys::Mutex::ScopedLock sl(bdbLock); + IdDbt key(p.getPersistenceId()); + db->del(0, &key, DB_AUTO_COMMIT); +} + + +void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e, + const qpid::broker::PersistableQueue& q, + const std::string& k, + const qpid::framing::FieldTable& a) +{ + checkInit(); + IdDbt key(e.getPersistenceId()); + BindingDbt value(e, q, k, a); + TxnCtxt txn; + txn.begin(dbenv.get(), true); + try { + put(bindingDb, txn.get(), key, value); + txn.commit(); + } catch (...) { + txn.abort(); + throw; + } +} + +void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e, + const qpid::broker::PersistableQueue& q, + const std::string& k, + const qpid::framing::FieldTable&) +{ + checkInit(); + deleteBinding(e, q, k); +} + +void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) +{ + checkInit(); + txn_list prepared; + recoverLockedMappings(prepared); + + queue_index queues;//id->queue + exchange_index exchanges;//id->exchange + message_index messages;//id->message + + TxnCtxt txn; + txn.begin(dbenv.get(), false); + try { + //read all queues, calls recoversMessages + recoverQueues(txn, registry, queues, prepared, messages); + + //recover exchange & bindings: + recoverExchanges(txn, registry, exchanges); + recoverBindings(txn, exchanges, queues); + + //recover general-purpose configuration + recoverGeneral(txn, registry); + + txn.commit(); + } catch (const DbException& e) { + txn.abort(); + THROW_STORE_EXCEPTION_2("Error on recovery", e); + } catch (...) { + txn.abort(); + throw; + } + + //recover transactions: + for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { + const PreparedTransaction pt = *i; + if (mgmtObject.get() != 0) { + mgmtObject->inc_tplTransactionDepth(); + mgmtObject->inc_tplTxnPrepares(); + } + + std::string xid = pt.xid; + + // Restore data token state in TxnCtxt + TplRecoverMapCitr citr = tplRecoverMap.find(xid); + if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); + + // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call + // was interrupted part way through committing/aborting the impacted queues. Complete this process. + bool incomplTplTxnFlag = citr->second.deq_flag; + + if (citr->second.tpc_flag) { + // Dtx (2PC) transaction + TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence); + std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc); + tpcc->recoverDtok(citr->second.rid, xid); + tpcc->prepare(tplStorePtr.get()); + + qpid::broker::RecoverableTransaction::shared_ptr dtx; + if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn); + if (pt.enqueues.get()) { + for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) { + tpcc->addXidRecord(queues[j->first]->getExternalQueueStore()); + if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]); + } + } + if (pt.dequeues.get()) { + for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) { + tpcc->addXidRecord(queues[j->first]->getExternalQueueStore()); + if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]); + } + } + + if (incomplTplTxnFlag) { + tpcc->complete(citr->second.commit_flag); + } + } else { + // Local (1PC) transaction + boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence)); + opcc->recoverDtok(citr->second.rid, xid); + opcc->prepare(tplStorePtr.get()); + + if (pt.enqueues.get()) { + for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) { + opcc->addXidRecord(queues[j->first]->getExternalQueueStore()); + } + } + if (pt.dequeues.get()) { + for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) { + opcc->addXidRecord(queues[j->first]->getExternalQueueStore()); + } + } + if (incomplTplTxnFlag) { + opcc->complete(citr->second.commit_flag); + } else { + completed(*opcc.get(), citr->second.commit_flag); + } + } + } + registry.recoveryComplete(); +} + +void MessageStoreImpl::recoverQueues(TxnCtxt& txn, + qpid::broker::RecoveryManager& registry, + queue_index& queue_index, + txn_list& prepared, + message_index& messages) +{ + Cursor queues; + queues.open(queueDb, txn.get()); + + u_int64_t maxQueueId(1); + + IdDbt key; + Dbt value; + //read all queues + while (queues.next(key, value)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + //create a Queue instance + qpid::broker::RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer); + //set the persistenceId and update max as required + queue->setPersistenceId(key.id); + + const std::string queueName = queue->getName().c_str(); + JournalImpl* jQueue = 0; + if (queueName.size() == 0) + { + QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); + break; + } + jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"), + defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList[queueName] = jQueue; + } + queue->setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); + + try + { + long rcnt = 0L; // recovered msg count + long idcnt = 0L; // in-doubt msg count + u_int64_t thisHighestRid = 0ULL; + jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery + if (highestRid == 0ULL) + highestRid = thisHighestRid; + else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit + highestRid = thisHighestRid; + recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt); + QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt."); + jQueue->recover_complete(); // start journal. + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what()); + } + //read all messages: done on a per queue basis if using Journal + + queue_index[key.id] = queue; + maxQueueId = std::max(key.id, maxQueueId); + } + + // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as + // the messageIdSequence is used for both queue journals and the tpl journal. + messageIdSequence.reset(highestRid + 1); + QPID_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); + + queueIdSequence.reset(maxQueueId + 1); +} + + +void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, + qpid::broker::RecoveryManager& registry, + exchange_index& index) +{ + //TODO: this is a copy&paste from recoverQueues - refactor! + Cursor exchanges; + exchanges.open(exchangeDb, txn.get()); + + u_int64_t maxExchangeId(1); + IdDbt key; + Dbt value; + //read all exchanges + while (exchanges.next(key, value)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + //create a Exchange instance + qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer); + if (exchange) { + //set the persistenceId and update max as required + exchange->setPersistenceId(key.id); + index[key.id] = exchange; + QPID_LOG(info, "Recovered exchange \"" << exchange->getName() << '"'); + } + maxExchangeId = std::max(key.id, maxExchangeId); + } + exchangeIdSequence.reset(maxExchangeId + 1); +} + +void MessageStoreImpl::recoverBindings(TxnCtxt& txn, + exchange_index& exchanges, + queue_index& queues) +{ + Cursor bindings; + bindings.open(bindingDb, txn.get()); + + IdDbt key; + Dbt value; + while (bindings.next(key, value)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + if (buffer.available() < 8) { + QPID_LOG(error, "Not enough data for binding: " << buffer.available()); + THROW_STORE_EXCEPTION("Not enough data for binding"); + } + uint64_t queueId = buffer.getLongLong(); + std::string queueName; + std::string routingkey; + qpid::framing::FieldTable args; + buffer.getShortString(queueName); + buffer.getShortString(routingkey); + buffer.get(args); + exchange_index::iterator exchange = exchanges.find(key.id); + queue_index::iterator queue = queues.find(queueId); + if (exchange != exchanges.end() && queue != queues.end()) { + //could use the recoverable queue here rather than the name... + exchange->second->bind(queueName, routingkey, args); + QPID_LOG(info, "Recovered binding exchange=" << exchange->second->getName() + << " key=" << routingkey + << " queue=" << queueName); + } else { + //stale binding, delete it + QPID_LOG(warning, "Deleting stale binding"); + bindings->del(0); + } + } +} + +void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, + qpid::broker::RecoveryManager& registry) +{ + Cursor items; + items.open(generalDb, txn.get()); + + u_int64_t maxGeneralId(1); + IdDbt key; + Dbt value; + //read all items + while (items.next(key, value)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + //create instance + qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer); + //set the persistenceId and update max as required + config->setPersistenceId(key.id); + maxGeneralId = std::max(key.id, maxGeneralId); + } + generalIdSequence.reset(maxGeneralId + 1); +} + +void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, + qpid::broker::RecoveryManager& recovery, + qpid::broker::RecoverableQueue::shared_ptr& queue, + txn_list& prepared, + message_index& messages, + long& rcnt, + long& idcnt) +{ + size_t preambleLength = sizeof(u_int32_t)/*header size*/; + + JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); + DataTokenImpl dtok; + size_t readSize = 0; + unsigned msg_count = 0; + + // TODO: This optimization to skip reading if there are no enqueued messages to read + // breaks the python system test in phase 6 with "Exception: Cannot write lock file" + // Figure out what is breaking. + //bool read = jc->get_enq_cnt() > 0; + bool read = true; + + void* dbuff = NULL; size_t dbuffSize = 0; + void* xidbuff = NULL; size_t xidbuffSize = 0; + bool transientFlag = false; + bool externalFlag = false; + + dtok.set_wstate(DataTokenImpl::ENQ); + + // Read the message from the Journal. + try { + unsigned aio_sleep_cnt = 0; + while (read) { + mrg::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + readSize = dtok.dsize(); + + switch (res) + { + case mrg::journal::RHM_IORES_SUCCESS: { + msg_count++; + qpid::broker::RecoverableMessage::shared_ptr msg; + char* data = (char*)dbuff; + + unsigned headerSize; + if (externalFlag) { + msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl + } else { + headerSize = qpid::framing::Buffer(data, preambleLength).getLong(); + qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ???? + msg = recovery.recoverMessage(headerBuff); + } + msg->setPersistenceId(dtok.rid()); + // At some future point if delivery attempts are stored, then this call would + // become optional depending on that information. + msg->setRedelivered(); + // Reset the TTL for the recovered message + msg->computeExpiration(broker->getExpiryPolicy()); + + u_int32_t contentOffset = headerSize + preambleLength; + u_int64_t contentSize = readSize - contentOffset; + if (msg->loadContent(contentSize) && !externalFlag) { + //now read the content + qpid::framing::Buffer contentBuff(data + contentOffset, contentSize); + msg->decodeContent(contentBuff); + } + + PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid()); + if (i == prepared.end()) { // not in prepared list + rcnt++; + queue->recover(msg); + } else { + u_int64_t rid = dtok.rid(); + std::string xid(i->xid); + TplRecoverMapCitr citr = tplRecoverMap.find(xid); + if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); + + // deq present in prepared list: this xid is part of incomplete txn commit/abort + // or this is a 1PC txn that must be rolled forward + if (citr->second.deq_flag || !citr->second.tpc_flag) { + if (jc->is_enqueued(rid, true)) { + // Enqueue is non-tx, dequeue tx + assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue + if (!citr->second.commit_flag) { + rcnt++; + queue->recover(msg); // recover message in abort case only + } + } else { + // Enqueue and/or dequeue tx + journal::txn_map& tmap = jc->get_txn_map(); + journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + bool enq = false; + bool deq = false; + for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + if (j->_enq_flag && j->_rid == rid) enq = true; + else if (!j->_enq_flag && j->_drid == rid) deq = true; + } + if (enq && !deq && citr->second.commit_flag) { + rcnt++; + queue->recover(msg); // recover txn message in commit case only + } + } + } else { + idcnt++; + messages[rid] = msg; + } + } + + dtok.reset(); + dtok.set_wstate(DataTokenImpl::ENQ); + + if (xidbuff) + ::free(xidbuff); + else if (dbuff) + ::free(dbuff); + aio_sleep_cnt = 0; + break; + } + case mrg::journal::RHM_IORES_PAGE_AIOWAIT: + if (++aio_sleep_cnt > MAX_AIO_SLEEPS) + THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()"); + ::usleep(AIO_SLEEP_TIME_US); + break; + case mrg::journal::RHM_IORES_EMPTY: + read = false; + break; // done with all messages. (add call in jrnl to test that _emap is empty.) + default: + std::ostringstream oss; + oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << mrg::journal::iores_str(res); + THROW_STORE_EXCEPTION(oss.str()); + } // switch + } // while + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); + } +} + +qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/, + uint64_t /*messageId*/, + unsigned& /*headerSize*/) +{ + throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); +} + +int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, + IdDbt& msgId, + qpid::broker::RecoverableMessage::shared_ptr& msg, + queue_index& index, + txn_list& prepared, + message_index& messages) +{ + Cursor mappings; + mappings.open(mappingDb, txn.get()); + + IdDbt value; + + int count(0); + for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) { + if (index.find(value.id) == index.end()) { + QPID_LOG(warning, "Recovered message for queue that no longer exists"); + mappings->del(0); + } else { + qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id]; + if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) { + messages[msgId.id] = msg; + } else { + queue->recover(msg); + } + count++; + } + } + mappings.close(); + return count; +} + +void MessageStoreImpl::readTplStore() +{ + tplRecoverMap.clear(); + journal::txn_map& tmap = tplStorePtr->get_txn_map(); + DataTokenImpl dtok; + void* dbuff = NULL; size_t dbuffSize = 0; + void* xidbuff = NULL; size_t xidbuffSize = 0; + bool transientFlag = false; + bool externalFlag = false; + bool done = false; + try { + unsigned aio_sleep_cnt = 0; + while (!done) { + dtok.reset(); + dtok.set_wstate(DataTokenImpl::ENQ); + mrg::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + switch (res) { + case mrg::journal::RHM_IORES_SUCCESS: { + // Every TPL record contains both data and an XID + assert(dbuffSize>0); + assert(xidbuffSize>0); + std::string xid(static_cast<const char*>(xidbuff), xidbuffSize); + bool is2PC = *(static_cast<char*>(dbuff)) != 0; + + // Check transaction details; add to recover map + journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + if (!txnList.empty()) { // xid found in tmap + unsigned enqCnt = 0; + unsigned deqCnt = 0; + u_int64_t rid = 0; + + // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists. + // Note: will apply to both 1PC and 2PC transactions. + bool commitFlag = true; + + for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + if (j->_enq_flag) { + rid = j->_rid; + enqCnt++; + } else { + commitFlag = j->_commit_flag; + deqCnt++; + } + } + assert(enqCnt == 1); + assert(deqCnt <= 1); + tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC))); + } + + ::free(xidbuff); + aio_sleep_cnt = 0; + break; + } + case mrg::journal::RHM_IORES_PAGE_AIOWAIT: + if (++aio_sleep_cnt > MAX_AIO_SLEEPS) + THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()"); + ::usleep(AIO_SLEEP_TIME_US); + break; + case mrg::journal::RHM_IORES_EMPTY: + done = true; + break; // done with all messages. (add call in jrnl to test that _emap is empty.) + default: + std::ostringstream oss; + oss << "readTplStore(): Unexpected result from journal read: " << mrg::journal::iores_str(res); + THROW_STORE_EXCEPTION(oss.str()); + } // switch + } + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); + } +} + +void MessageStoreImpl::recoverTplStore() +{ + if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { + u_int64_t thisHighestRid = 0ULL; + tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + if (highestRid == 0ULL) + highestRid = thisHighestRid; + else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit + highestRid = thisHighestRid; + + // Load tplRecoverMap by reading the TPL store + readTplStore(); + + tplStorePtr->recover_complete(); // start journal. + } +} + +void MessageStoreImpl::recoverLockedMappings(txn_list& txns) +{ + if (!tplStorePtr->is_ready()) + recoverTplStore(); + + // Abort unprepared xids and populate the locked maps + for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { + LockedMappings::shared_ptr enq_ptr; + enq_ptr.reset(new LockedMappings); + LockedMappings::shared_ptr deq_ptr; + deq_ptr.reset(new LockedMappings); + txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); + } +} + +void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) +{ + if (tplStorePtr->is_ready()) { + tplStorePtr->read_reset(); + readTplStore(); + } else { + recoverTplStore(); + } + for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { + // Discard all txns that are to be rolled forward/back and 1PC transactions + if (!i->second.deq_flag && i->second.tpc_flag) + xids.insert(i->first); + } +} + +void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) +{ + throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage"); +} + +void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/) +{ + throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy"); +} + +void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/, + const std::string& /*data*/) +{ + throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); +} + +void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + u_int64_t offset, + u_int32_t length) +{ + checkInit(); + u_int64_t messageId (msg->getPersistenceId()); + + if (messageId != 0) { + try { + JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); + if (jc && jc->is_enqueued(messageId) ) { + if (!jc->loadMsgContent(messageId, data, length, offset)) { + std::ostringstream oss; + oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern"; + THROW_STORE_EXCEPTION(oss.str()); + } + } else { + std::ostringstream oss; + oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued"; + THROW_STORE_EXCEPTION(oss.str()); + } + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what()); + } + } else { + THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); + } +} + +void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue) +{ + if (queue.getExternalQueueStore() == 0) return; + checkInit(); + std::string qn = queue.getName(); + try { + JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); + if (jc) { + // TODO: check if this result should be used... + /*mrg::journal::iores res =*/ jc->flush(); + } + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); + } +} + +void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue) +{ + checkInit(); + u_int64_t queueId (queue.getPersistenceId()); + u_int64_t messageId (msg->getPersistenceId()); + if (queueId == 0) { + THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); + } + + TxnCtxt implicit; + TxnCtxt* txn = 0; + if (ctxt) { + txn = check(ctxt); + } else { + txn = &implicit; + } + + bool newId = false; + if (messageId == 0) { + messageId = messageIdSequence.next(); + msg->setPersistenceId(messageId); + newId = true; + } + store(&queue, txn, msg, newId); + + // add queue* to the txn map.. + if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); +} + +u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message) +{ + u_int32_t headerSize = message->encodedHeaderSize(); + u_int64_t size = message->encodedSize() + sizeof(u_int32_t); + try { buff = std::vector<char>(size); } // long + headers + content + catch (const std::exception& e) { + std::ostringstream oss; + oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what(); + THROW_STORE_EXCEPTION(oss.str()); + } + qpid::framing::Buffer buffer(&buff[0],size); + buffer.putLong(headerSize); + message->encode(buffer); + return size; +} + +void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, + TxnCtxt* txn, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message, + bool /*newId*/) +{ + std::vector<char> buff; + u_int64_t size = msgEncode(buff, message); + + try { + if (queue) { + boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl); + dtokp->addRef(); + dtokp->setSourceMessage(message); + dtokp->set_external_rid(true); + dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id) + + JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); + if (txn->getXid().empty()) { + jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent()); + } else { + jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn->getXid(), !message->isPersistent()); + } + } else { + THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); + } + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " + + e.what()); + } +} + +void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue) +{ + checkInit(); + u_int64_t queueId (queue.getPersistenceId()); + u_int64_t messageId (msg->getPersistenceId()); + if (queueId == 0) { + THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)"); + } + if (messageId == 0) { + THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id."); + } + + TxnCtxt implicit; + TxnCtxt* txn = 0; + if (ctxt) { + txn = check(ctxt); + } else { + txn = &implicit; + } + + // add queue* to the txn map.. + if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); + async_dequeue(ctxt, msg, queue); + + msg->dequeueComplete(); +} + +void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue) +{ + boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl); + ddtokp->setSourceMessage(msg); + ddtokp->set_external_rid(true); + ddtokp->set_rid(messageIdSequence.next()); + ddtokp->set_dequeue_rid(msg->getPersistenceId()); + ddtokp->set_wstate(DataTokenImpl::ENQ); + std::string tid; + if (ctxt) { + TxnCtxt* txn = check(ctxt); + tid = txn->getXid(); + } + // Manually increase the ref count, as raw pointers are used beyond this point + ddtokp->addRef(); + try { + JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); + if (tid.empty()) { + jc->dequeue_data_record(ddtokp.get()); + } else { + jc->dequeue_txn_data_record(ddtokp.get(), tid); + } + } catch (const journal::jexception& e) { + ddtokp->release(); + THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what()); + } +} + +u_int32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) +{ + checkInit(); + return 0; +} + +void MessageStoreImpl::completed(TxnCtxt& txn, + bool commit) +{ + try { + chkTplStoreInit(); // Late initialize (if needed) + + // Nothing to do if not prepared + if (txn.getDtok()->is_enqueued()) { + txn.incrDtokRef(); + DataTokenImpl* dtokp = txn.getDtok(); + dtokp->set_dequeue_rid(dtokp->rid()); + dtokp->set_rid(messageIdSequence.next()); + tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit); + } + txn.complete(commit); + if (mgmtObject.get() != 0) { + mgmtObject->dec_tplTransactionDepth(); + if (commit) + mgmtObject->inc_tplTxnCommits(); + else + mgmtObject->inc_tplTxnAborts(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what()); + throw; + } +} + +std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin() +{ + checkInit(); + // pass sequence number for c/a + return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence)); +} + +std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid) +{ + checkInit(); + IdSequence* jtx = &messageIdSequence; + // pass sequence number for c/a + return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx)); +} + +void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt) +{ + checkInit(); + TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt); + if(!txn) throw qpid::broker::InvalidTransactionContextException(); + localPrepare(txn); +} + +void MessageStoreImpl::localPrepare(TxnCtxt* ctxt) +{ + try { + chkTplStoreInit(); // Late initialize (if needed) + + // This sync is required to ensure multi-queue atomicity - ie all txn data + // must hit the disk on *all* queues before the TPL prepare (enq) is written. + ctxt->sync(); + + ctxt->incrDtokRef(); + DataTokenImpl* dtokp = ctxt->getDtok(); + dtokp->set_external_rid(true); + dtokp->set_rid(messageIdSequence.next()); + char tpcFlag = static_cast<char>(ctxt->isTPC()); + tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false); + ctxt->prepare(tplStorePtr.get()); + // make sure all the data is written to disk before returning + ctxt->sync(); + if (mgmtObject.get() != 0) { + mgmtObject->inc_tplTransactionDepth(); + mgmtObject->inc_tplTxnPrepares(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what()); + throw; + } +} + +void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt) +{ + checkInit(); + TxnCtxt* txn(check(&ctxt)); + if (!txn->isTPC()) { + if (txn->impactedQueuesEmpty()) return; + localPrepare(dynamic_cast<TxnCtxt*>(txn)); + } + completed(*dynamic_cast<TxnCtxt*>(txn), true); +} + +void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt) +{ + checkInit(); + TxnCtxt* txn(check(&ctxt)); + if (!txn->isTPC()) { + if (txn->impactedQueuesEmpty()) return; + localPrepare(dynamic_cast<TxnCtxt*>(txn)); + } + completed(*dynamic_cast<TxnCtxt*>(txn), false); +} + +TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt) +{ + TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt); + if(!txn) throw qpid::broker::InvalidTransactionContextException(); + return txn; +} + +void MessageStoreImpl::put(db_ptr db, + DbTxn* txn, + Dbt& key, + Dbt& value) +{ + try { + int status = db->put(txn, &key, &value, DB_NODUPDATA); + if (status == DB_KEYEXIST) { + THROW_STORE_EXCEPTION("duplicate data"); + } else if (status) { + THROW_STORE_EXCEPTION(DbEnv::strerror(status)); + } + } catch (const DbException& e) { + THROW_STORE_EXCEPTION(e.what()); + } +} + +void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue) +{ + TxnCtxt txn; + txn.begin(dbenv.get(), true); + try { + { + Cursor bindings; + bindings.open(bindingDb, txn.get()); + + IdDbt key; + Dbt value; + while (bindings.next(key, value)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + if (buffer.available() < 8) { + THROW_STORE_EXCEPTION("Not enough data for binding"); + } + uint64_t queueId = buffer.getLongLong(); + if (queue.getPersistenceId() == queueId) { + bindings->del(0); + QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + } + } + } + txn.commit(); + } catch (const std::exception& e) { + txn.abort(); + THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what()); + } catch (...) { + txn.abort(); + throw; + } + QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId()); +} + +void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange, + const qpid::broker::PersistableQueue& queue, + const std::string& bkey) +{ + TxnCtxt txn; + txn.begin(dbenv.get(), true); + try { + { + Cursor bindings; + bindings.open(bindingDb, txn.get()); + + IdDbt key(exchange.getPersistenceId()); + Dbt value; + + for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) { + qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); + if (buffer.available() < 8) { + THROW_STORE_EXCEPTION("Not enough data for binding"); + } + uint64_t queueId = buffer.getLongLong(); + if (queue.getPersistenceId() == queueId) { + std::string q; + std::string k; + buffer.getShortString(q); + buffer.getShortString(k); + if (bkey == k) { + bindings->del(0); + QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + } + } + } + } + txn.commit(); + } catch (const std::exception& e) { + txn.abort(); + THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what()); + } catch (...) { + txn.abort(); + throw; + } +} + +std::string MessageStoreImpl::getJrnlBaseDir() +{ + std::ostringstream dir; + dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ; + return dir.str(); +} + +std::string MessageStoreImpl::getBdbBaseDir() +{ + std::ostringstream dir; + dir << storeDir << "/" << storeTopLevelDir << "/dat/" ; + return dir.str(); +} + +std::string MessageStoreImpl::getTplBaseDir() +{ + std::ostringstream dir; + dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ; + return dir.str(); +} + +std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/ +{ + return getJrnlHashDir(queue.getName().c_str()); +} + +u_int32_t MessageStoreImpl::bHash(const std::string str) +{ + // Daniel Bernstein hash fn + u_int32_t h = 0; + for (std::string::const_iterator i = str.begin(); i < str.end(); i++) + h = 33*h + *i; + return h; +} + +std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/ +{ + std::stringstream dir; + dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4); + dir << (bHash(queueName.c_str()) % 29); // Use a prime number for better distribution across dirs + dir << "/" << queueName << "/"; + return dir.str(); +} + +std::string MessageStoreImpl::getStoreDir() const { return storeDir; } + +void MessageStoreImpl::journalDeleted(JournalImpl& j) { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList.erase(j.id()); +} + +MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : + qpid::Options(name), + numJrnlFiles(defNumJrnlFiles), + autoJrnlExpand(defAutoJrnlExpand), + autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles), + jrnlFsizePgs(defJrnlFileSizePgs), + truncateFlag(defTruncateFlag), + wCachePageSizeKib(defWCachePageSize), + tplNumJrnlFiles(defTplNumJrnlFiles), + tplJrnlFsizePgs(defTplJrnlFileSizePgs), + tplWCachePageSizeKib(defTplWCachePageSize) +{ + std::ostringstream oss1; + oss1 << "Default number of files for each journal instance (queue). [Allowable values: " << + JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]"; + std::ostringstream oss2; + oss2 << "Default size for each journal file in multiples of read pages (1 read page = 64KiB). [Allowable values: " << + JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]"; + std::ostringstream oss3; + oss3 << "Number of files for transaction prepared list journal instance. [Allowable values: " << + JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]"; + std::ostringstream oss4; + oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " << + JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]"; + addOptions() + ("store-dir", qpid::optValue(storeDir, "DIR"), + "Store directory location for persistence (instead of using --data-dir value). " + "Required if --no-data-dir is also used.") + ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str()) + ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str()) +// TODO: Uncomment these lines when auto-expand is enabled. +// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"), +// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. " +// "If no|false|0, the number of journal files will remain fixed (num-jfiles).") +// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"), +// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.") + ("truncate", qpid::optValue(truncateFlag, "yes|no"), + "If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve " + "the existing store files for recovery.") + ("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"), + "Size of the pages in the write page cache in KiB. " + "Allowable values - powers of 2: 1, 2, 4, ... , 128. " + "Lower values decrease latency at the expense of throughput.") + ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str()) + ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str()) + ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"), + "Size of the pages in the transaction prepared list write page cache in KiB. " + "Allowable values - powers of 2: 1, 2, 4, ... , 128. " + "Lower values decrease latency at the expense of throughput.") + ; +} + +}} |