diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/legacystore/MessageStoreImpl.h | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/legacystore/MessageStoreImpl.h')
-rw-r--r-- | cpp/src/qpid/legacystore/MessageStoreImpl.h | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/MessageStoreImpl.h b/cpp/src/qpid/legacystore/MessageStoreImpl.h new file mode 100644 index 0000000000..68aceedfbb --- /dev/null +++ b/cpp/src/qpid/legacystore/MessageStoreImpl.h @@ -0,0 +1,380 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H +#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H + +#include <string> + +#include "db-inc.h" +#include "qpid/legacystore/Cursor.h" +#include "qpid/legacystore/IdDbt.h" +#include "qpid/legacystore/IdSequence.h" +#include "qpid/legacystore/JournalImpl.h" +#include "qpid/legacystore/jrnl/jcfg.h" +#include "qpid/legacystore/PreparedTransaction.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/MessageStore.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/legacystore/Store.h" +#include "qpid/legacystore/TxnCtxt.h" + +// Assume DB_VERSION_MAJOR == 4 +#if (DB_VERSION_MINOR == 2) +#include <errno.h> +#define DB_BUFFER_SMALL ENOMEM +#endif + +namespace qpid { namespace sys { +class Timer; +}} + +namespace mrg { +namespace msgstore { + +/** + * An implementation of the MessageStore interface based on Berkeley DB + */ +class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable +{ + public: + typedef boost::shared_ptr<Db> db_ptr; + typedef boost::shared_ptr<DbEnv> dbEnv_ptr; + + struct StoreOptions : public qpid::Options { + StoreOptions(const std::string& name="Store Options"); + std::string clusterName; + std::string storeDir; + u_int16_t numJrnlFiles; + bool autoJrnlExpand; + u_int16_t autoJrnlExpandMaxFiles; + u_int32_t jrnlFsizePgs; + bool truncateFlag; + u_int32_t wCachePageSizeKib; + u_int16_t tplNumJrnlFiles; + u_int32_t tplJrnlFsizePgs; + u_int32_t tplWCachePageSizeKib; + }; + + protected: + typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index; + typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index; + typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index; + + typedef LockedMappings::map txn_lock_map; + typedef boost::ptr_list<PreparedTransaction> txn_list; + + // Structs for Transaction Recover List (TPL) recover state + struct TplRecoverStruct { + u_int64_t rid; // rid of TPL record + bool deq_flag; + bool commit_flag; + bool tpc_flag; + TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag); + }; + typedef TplRecoverStruct TplRecover; + typedef std::pair<std::string, TplRecover> TplRecoverMapPair; + typedef std::map<std::string, TplRecover> TplRecoverMap; + typedef TplRecoverMap::const_iterator TplRecoverMapCitr; + + typedef std::map<std::string, JournalImpl*> JournalListMap; + typedef JournalListMap::iterator JournalListMapItr; + + // Default store settings + static const u_int16_t defNumJrnlFiles = 8; + static const u_int32_t defJrnlFileSizePgs = 24; + static const bool defTruncateFlag = false; + static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; + static const u_int16_t defTplNumJrnlFiles = 8; + static const u_int32_t defTplJrnlFileSizePgs = 24; + static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8; + // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line + static const bool defAutoJrnlExpand = false; + static const u_int16_t defAutoJrnlExpandMaxFiles = 0; + + static const std::string storeTopLevelDir; + static qpid::sys::Duration defJournalGetEventsTimeout; + static qpid::sys::Duration defJournalFlushTimeout; + + std::list<db_ptr> dbs; + dbEnv_ptr dbenv; + db_ptr queueDb; + db_ptr configDb; + db_ptr exchangeDb; + db_ptr mappingDb; + db_ptr bindingDb; + db_ptr generalDb; + + // Pointer to Transaction Prepared List (TPL) journal instance + boost::shared_ptr<TplJournalImpl> tplStorePtr; + TplRecoverMap tplRecoverMap; + qpid::sys::Mutex tplInitLock; + JournalListMap journalList; + qpid::sys::Mutex journalListLock; + qpid::sys::Mutex bdbLock; + + IdSequence queueIdSequence; + IdSequence exchangeIdSequence; + IdSequence generalIdSequence; + IdSequence messageIdSequence; + std::string storeDir; + u_int16_t numJrnlFiles; + bool autoJrnlExpand; + u_int16_t autoJrnlExpandMaxFiles; + u_int32_t jrnlFsizeSblks; + bool truncateFlag; + u_int32_t wCachePgSizeSblks; + u_int16_t wCacheNumPages; + u_int16_t tplNumJrnlFiles; + u_int32_t tplJrnlFsizeSblks; + u_int32_t tplWCachePgSizeSblks; + u_int16_t tplWCacheNumPages; + u_int64_t highestRid; + bool isInit; + const char* envPath; + qpid::broker::Broker* broker; + + qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject; + qpid::management::ManagementAgent* agent; + + + // Parameter validation and calculation + static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, + const std::string paramName); + static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, + const std::string paramName, + const u_int32_t wCachePgSizeSblks = 0); + static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param, + const std::string paramName, + const u_int16_t jrnlFsizePgs); + static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib); + void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, + bool& autoJrnlExpand, + u_int16_t& autoJrnlExpandMaxFiles, + const std::string& autoJrnlExpandMaxFilesParamName, + const u_int16_t numJrnlFiles, + const std::string& numJrnlFilesParamName); + + void init(); + + void recoverQueues(TxnCtxt& txn, + qpid::broker::RecoveryManager& recovery, + queue_index& index, + txn_list& locked, + message_index& messages); + void recoverMessages(TxnCtxt& txn, + qpid::broker::RecoveryManager& recovery, + queue_index& index, + txn_list& locked, + message_index& prepared); + void recoverMessages(TxnCtxt& txn, + qpid::broker::RecoveryManager& recovery, + qpid::broker::RecoverableQueue::shared_ptr& queue, + txn_list& locked, + message_index& prepared, + long& rcnt, + long& idcnt); + qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery, + uint64_t mId, + unsigned& headerSize); + void recoverExchanges(TxnCtxt& txn, + qpid::broker::RecoveryManager& recovery, + exchange_index& index); + void recoverBindings(TxnCtxt& txn, + exchange_index& exchanges, + queue_index& queues); + void recoverGeneral(TxnCtxt& txn, + qpid::broker::RecoveryManager& recovery); + int enqueueMessage(TxnCtxt& txn, + IdDbt& msgId, + qpid::broker::RecoverableMessage::shared_ptr& msg, + queue_index& index, + txn_list& locked, + message_index& prepared); + void readTplStore(); + void recoverTplStore(); + void recoverLockedMappings(txn_list& txns); + TxnCtxt* check(qpid::broker::TransactionContext* ctxt); + u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message); + void store(const qpid::broker::PersistableQueue* queue, + TxnCtxt* txn, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message, + bool newId); + void async_dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue); + void destroy(db_ptr db, + const qpid::broker::Persistable& p); + bool create(db_ptr db, + IdSequence& seq, + const qpid::broker::Persistable& p); + void completed(TxnCtxt& txn, + bool commit); + void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue); + void deleteBinding(const qpid::broker::PersistableExchange& exchange, + const qpid::broker::PersistableQueue& queue, + const std::string& key); + + void put(db_ptr db, + DbTxn* txn, + Dbt& key, + Dbt& value); + void open(db_ptr db, + DbTxn* txn, + const char* file, + bool dupKey); + void closeDbs(); + + // journal functions + void createJrnlQueue(const qpid::broker::PersistableQueue& queue); + u_int32_t bHash(const std::string str); + std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ + std::string getJrnlHashDir(const std::string& queueName); + std::string getJrnlBaseDir(); + std::string getBdbBaseDir(); + std::string getTplBaseDir(); + inline void checkInit() { + // TODO: change the default dir to ~/.qpidd + if (!isInit) { init("/tmp"); isInit = true; } + } + void chkTplStoreInit(); + + // debug aid for printing XIDs that may contain non-printable chars + static std::string xid2str(const std::string xid) { + std::ostringstream oss; + oss << std::hex << std::setfill('0'); + for (unsigned i=0; i<xid.size(); i++) { + if (isprint(xid[i])) + oss << xid[i]; + else + oss << "/" << std::setw(2) << (int)((char)xid[i]); + } + return oss.str(); + } + + public: + typedef boost::shared_ptr<MessageStoreImpl> shared_ptr; + + MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0); + + virtual ~MessageStoreImpl(); + + bool init(const qpid::Options* options); + + bool init(const std::string& dir, + u_int16_t jfiles = defNumJrnlFiles, + u_int32_t jfileSizePgs = defJrnlFileSizePgs, + const bool truncateFlag = false, + u_int32_t wCachePageSize = defWCachePageSize, + u_int16_t tplJfiles = defTplNumJrnlFiles, + u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs, + u_int32_t tplWCachePageSize = defTplWCachePageSize, + bool autoJExpand = defAutoJrnlExpand, + u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles); + + void truncateInit(const bool saveStoreContent = false); + + void initManagement (); + + void finalize(); + + void create(qpid::broker::PersistableQueue& queue, + const qpid::framing::FieldTable& args); + + void destroy(qpid::broker::PersistableQueue& queue); + + void create(const qpid::broker::PersistableExchange& queue, + const qpid::framing::FieldTable& args); + + void destroy(const qpid::broker::PersistableExchange& queue); + + void bind(const qpid::broker::PersistableExchange& exchange, + const qpid::broker::PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + void unbind(const qpid::broker::PersistableExchange& exchange, + const qpid::broker::PersistableQueue& queue, + const std::string& key, + const qpid::framing::FieldTable& args); + + void create(const qpid::broker::PersistableConfig& config); + + void destroy(const qpid::broker::PersistableConfig& config); + + void recover(qpid::broker::RecoveryManager& queues); + + void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg); + + void destroy(qpid::broker::PersistableMessage& msg); + + void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + const std::string& data); + + void loadContent(const qpid::broker::PersistableQueue& queue, + const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, + std::string& data, + uint64_t offset, + uint32_t length); + + void enqueue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue); + + void dequeue(qpid::broker::TransactionContext* ctxt, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, + const qpid::broker::PersistableQueue& queue); + + void flush(const qpid::broker::PersistableQueue& queue); + + u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue); + + void collectPreparedXids(std::set<std::string>& xids); + + std::auto_ptr<qpid::broker::TransactionContext> begin(); + + std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid); + + void prepare(qpid::broker::TPCTransactionContext& ctxt); + + void localPrepare(TxnCtxt* ctxt); + + void commit(qpid::broker::TransactionContext& ctxt); + + void abort(qpid::broker::TransactionContext& ctxt); + + qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const + { return mgmtObject; } + + inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&) + { return qpid::management::Manageable::STATUS_OK; } + + std::string getStoreDir() const; + + private: + void journalDeleted(JournalImpl&); + +}; // class MessageStoreImpl + +} // namespace msgstore +} // namespace mrg + +#endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H |