diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:26:10 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:26:10 +0000 |
| commit | f05a68597934004986f051fb7a02a519d7cce3dd (patch) | |
| tree | 26a532f4950e63024d70805e3f7311de4f67d634 /qpid/cpp/src | |
| parent | 643fd0c2ae8f501676be48c02c06d913a3419436 (diff) | |
| download | qpid-python-f05a68597934004986f051fb7a02a519d7cce3dd.tar.gz | |
QPID-5460: [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556892 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/ISSUES | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/JournalImpl.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 115 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 19 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/jcfg.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp | 56 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_map.h | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp | 8 |
12 files changed, 132 insertions, 131 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 7132a1f4c9..595730135d 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -58,8 +58,9 @@ Current bugs and performance issues: 4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time 5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071) 6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL -6. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL -7. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs +7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL +8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs +9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery Code tidy-up ------------ diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index a71db3d384..18cf0619e4 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -193,6 +193,7 @@ JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFileP } */ + // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly if (prep_tx_list_ptr) { // Create list of prepared xids std::vector<std::string> prep_xid_list; @@ -209,8 +210,8 @@ JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFileP if (prep_tx_list_ptr) { for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { - ::qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found - for (::qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { + ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found + for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { if (tdl_itr->enq_flag_) { // enqueue op i->enqueues->add(queue_id, tdl_itr->rid_); } else { // dequeue op diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index ca71d1b780..3df194d479 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -341,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { qpid::linearstore::journal::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -584,6 +584,13 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) txn_list prepared; recoverLockedMappings(prepared); + std::ostringstream oss; + oss << "Recovered transaction prepared list:"; + for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { + oss << std::endl << " " << str2hexnum(i->xid); + } + QLS_LOG(debug, oss.str()); + queue_index queues;//id->queue exchange_index exchanges;//id->exchange message_index messages;//id->message @@ -620,39 +627,20 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } std::string xid = pt.xid; - qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid); if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - uint64_t rid = 0ULL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - rid = j->rid_; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recover(): Inconsistent TPL 2PC count"); - bool commitFlag = abortCnt == 0; + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + bool commitFlag = txn_op_stats.abortCnt == 0; // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call // was interrupted part way through committing/aborting the impacted queues. Complete this process. - bool incomplTplTxnFlag = deqCnt > 0; + bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0; - if (tpcCnt > 0) { + if (txn_op_stats.tpcCnt > 0) { // Dtx (2PC) transaction TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence); std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc); - tpcc->recoverDtok(rid, xid); + tpcc->recoverDtok(txn_op_stats.rid, xid); tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; @@ -676,7 +664,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } else { // Local (1PC) transaction boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence)); - opcc->recoverDtok(rid, xid); + opcc->recoverDtok(txn_op_stats.rid, xid); opcc->prepare(tplStorePtr.get()); if (pt.enqueues.get()) { @@ -919,7 +907,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl } else { headerSize = qpid::framing::Buffer(data, preambleLength).getLong(); - qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ???? + qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); msg = recovery.recoverMessage(headerBuff); } msg->setPersistenceId(dtok.rid()); @@ -944,45 +932,30 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } else { uint64_t rid = dtok.rid(); std::string xid(i->xid); - qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid); if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recoverMessages(): Inconsistent TPL 2PC count"); - if (deqCnt > 0 || tpcCnt == 0) { + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) { if (jc->is_enqueued(rid, true)) { // Enqueue is non-tx, dequeue tx assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue - if (abortCnt > 0) { + if (txn_op_stats.abortCnt > 0) { rcnt++; queue->recover(msg); // recover message in abort case only } } else { // Enqueue and/or dequeue tx qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map(); - qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found bool enq = false; bool deq = false; - for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->enq_flag_ && j->rid_ == rid) enq = true; - else if (!j->enq_flag_ && j->drid_ == rid) deq = true; + for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) { + if (j->enq_flag_ && j->rid_ == rid) + enq = true; + else if (!j->enq_flag_ && j->drid_ == rid) + deq = true; } - if (enq && !deq && abortCnt == 0) { + if (enq && !deq && txn_op_stats.abortCnt == 0) { rcnt++; queue->recover(msg); // recover txn message in commit case only } @@ -1101,27 +1074,10 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) std::vector<std::string> xidList; tplStorePtr->get_txn_map().xid_list(xidList); for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { - qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0) { - if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::collectPreparedXids: Inconsistent TPL 2PC count"); - if (enqCnt - deqCnt > 0) { + qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + if (txn_op_stats.tpcCnt > 0) { + if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) { xids.insert(*i); } } @@ -1554,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) { journalList.erase(j_.id()); } +std::string MessageStoreImpl::str2hexnum(const std::string& str) { + std::ostringstream oss; + oss << "(" << str.size() << ")0x" << std::hex; + for (unsigned i=str.size(); i>0; --i) { + oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; + } + return oss.str(); +} + MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : qpid::Options(name_), truncateFlag(defTruncateFlag), diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 95667be82e..3157b9be9d 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -235,18 +235,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem } void chkTplStoreInit(); - // debug aid for printing XIDs that may contain non-printable chars - static std::string xid2str(const std::string xid) { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - for (unsigned i=0; i<xid.size(); i++) { - if (isprint(xid[i])) - oss << xid[i]; - else - oss << "/" << std::setw(2) << (int)((char)xid[i]); - } - return oss.str(); - } + static std::string str2hexnum(const std::string& str); public: typedef boost::shared_ptr<MessageStoreImpl> shared_ptr; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 6050d6e899..67ad8a04a0 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -117,14 +117,13 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr std::vector<std::string>::const_iterator pitr = std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr); if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list - txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found + txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found // Unlock any affected enqueues in emap - for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) { + for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) { if (i->enq_flag_) { // enq op - decrement enqueue count fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record - int16_t ret = enqueueMapRef_.unlock(i->drid_); - if (ret < enq_map::EMAP_OK) { // fail + if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND std::ostringstream oss; oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_; @@ -669,8 +668,8 @@ bool RecoveryManager::getNextRecordHeader() throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, ar.xid_size()); - txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found - for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { + txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); } else { @@ -697,8 +696,8 @@ bool RecoveryManager::getNextRecordHeader() throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, cr.xid_size()); - txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found - for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { + txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { // txn enqueue //std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail @@ -766,8 +765,8 @@ void RecoveryManager::prepareRecordList() { std::vector<std::string> xidList; transactionMapRef_.xid_list(xidList); for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) { - qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j); - for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) { + qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j); + for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) { if (k->enq_flag_) { recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true)); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcfg.h b/qpid/cpp/src/qpid/linearstore/journal/jcfg.h index 83c61bcb5f..82cff99169 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jcfg.h @@ -43,8 +43,8 @@ #define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */ #define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */ -#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */ -#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */ +#define QLS_TXA_MAGIC 0x61534c51 /**< ("QLSa" in little endian) Magic for dtx abort hdrs */ +#define QLS_TXC_MAGIC 0x63534c51 /**< ("QLSc" in little endian) Magic for dtx commit hdrs */ #define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */ #define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */ #define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */ diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp index 304ab0f6ee..3ada3d2504 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp @@ -51,6 +51,38 @@ txn_data_t::txn_data_t(const uint64_t rid, aio_compl_(false) {} +txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) : + enqCnt(0U), + deqCnt(0U), + tpcCnt(0U), + abortCnt(0U), + commitCnt(0U), + rid(0ULL) +{ + for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) { + if (i->enq_flag_) { + ++enqCnt; + rid = i->rid_; + } else { + ++deqCnt; + if (i->commit_flag_) { + ++commitCnt; + } else { + ++abortCnt; + } + } + if (i->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) { + throw jexception("Inconsistent 2PC count"); // TODO: complete exception details + } + if (abortCnt > 0 && commitCnt > 0) { + throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details + } +} + txn_map::txn_map(): _map()/*, _pfid_txn_cnt()*/ @@ -66,7 +98,7 @@ txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td) xmap_itr itr = _map.find(xid); if (itr == _map.end()) // not found in map { - txn_data_list list; + txn_data_list_t list; list.push_back(td); std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list)); if (!ret.second) // duplicate @@ -77,14 +109,14 @@ txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td) return ok; } -const txn_data_list +const txn_data_list_t txn_map::get_tdata_list(const std::string& xid) { slock s(_mutex); return get_tdata_list_nolock(xid); } -const txn_data_list +const txn_data_list_t txn_map::get_tdata_list_nolock(const std::string& xid) { xmap_itr itr = _map.find(xid); @@ -93,14 +125,14 @@ txn_map::get_tdata_list_nolock(const std::string& xid) return itr->second; } -const txn_data_list +const txn_data_list_t txn_map::get_remove_tdata_list(const std::string& xid) { slock s(_mutex); xmap_itr itr = _map.find(xid); if (itr == _map.end()) // not found in map return _empty_data_list; - txn_data_list list = itr->second; + txn_data_list_t list = itr->second; _map.erase(itr); return list; } @@ -132,7 +164,7 @@ txn_map::cnt(const bool enq_flag) uint32_t c = 0; for (xmap_itr i = _map.begin(); i != _map.end(); i++) { - for (tdl_itr j = i->second.begin(); j < i->second.end(); j++) + for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++) { if (j->enq_flag_ == enq_flag) c++; @@ -149,7 +181,7 @@ txn_map::is_txn_synced(const std::string& xid) if (itr == _map.end()) // not found in map return TMAP_XID_NOT_FOUND; bool is_synced = true; - for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) + for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++) { if (!litr->aio_compl_) { @@ -167,7 +199,7 @@ txn_map::set_aio_compl(const std::string& xid, const uint64_t rid) xmap_itr itr = _map.find(xid); if (itr == _map.end()) // xid not found in map return TMAP_XID_NOT_FOUND; - for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) + for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++) { if (litr->rid_ == rid) { @@ -185,8 +217,8 @@ txn_map::data_exists(const std::string& xid, const uint64_t rid) bool found = false; { slock s(_mutex); - txn_data_list tdl = get_tdata_list_nolock(xid); - tdl_itr itr = tdl.begin(); + txn_data_list_t tdl = get_tdata_list_nolock(xid); + tdl_itr_t itr = tdl.begin(); while (itr != tdl.end() && !found) { found = itr->rid_ == rid; @@ -204,8 +236,8 @@ txn_map::is_enq(const uint64_t rid) slock s(_mutex); for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++) { - txn_data_list list = i->second; - for (tdl_itr j = list.begin(); j < list.end() && !found; j++) + txn_data_list_t list = i->second; + for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++) { if (j->enq_flag_) found = j->rid_ == rid; diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h index a99f9170c7..996d54bdac 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h @@ -53,8 +53,20 @@ namespace journal { const bool tpc_flag, const bool commit_flag); } txn_data_t; - typedef std::vector<txn_data_t> txn_data_list; - typedef txn_data_list::iterator tdl_itr; + typedef std::vector<txn_data_t> txn_data_list_t; + typedef txn_data_list_t::iterator tdl_itr_t; + typedef txn_data_list_t::const_iterator tdl_const_itr_t; + + typedef struct txn_op_stats_t + { + uint16_t enqCnt; + uint16_t deqCnt; + uint16_t tpcCnt; + uint16_t abortCnt; + uint16_t commitCnt; + uint64_t rid; + txn_op_stats_t(const txn_data_list_t& tdl); + } txn_op_stats_t; /** * \class txn_map @@ -102,21 +114,21 @@ namespace journal { static int16_t TMAP_SYNCED; private: - typedef std::pair<std::string, txn_data_list> xmap_param; - typedef std::map<std::string, txn_data_list> xmap; + typedef std::pair<std::string, txn_data_list_t> xmap_param; + typedef std::map<std::string, txn_data_list_t> xmap; typedef xmap::iterator xmap_itr; xmap _map; smutex _mutex; - const txn_data_list _empty_data_list; + const txn_data_list_t _empty_data_list; public: txn_map(); virtual ~txn_map(); bool insert_txn_data(const std::string& xid, const txn_data_t& td); - const txn_data_list get_tdata_list(const std::string& xid); - const txn_data_list get_remove_tdata_list(const std::string& xid); + const txn_data_list_t get_tdata_list(const std::string& xid); + const txn_data_list_t get_remove_tdata_list(const std::string& xid); bool in_map(const std::string& xid); uint32_t enq_cnt(); uint32_t deq_cnt(); @@ -130,7 +142,7 @@ namespace journal { void xid_list(std::vector<std::string>& xv); private: uint32_t cnt(const bool enq_flag); - const txn_data_list get_tdata_list_nolock(const std::string& xid); + const txn_data_list_t get_tdata_list_nolock(const std::string& xid); }; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c b/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c index 144ce4125a..b55c1c16c8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c @@ -41,6 +41,6 @@ bool is_txn_coml_commit(const deq_hdr_t *dh) { } void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) { - dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : - dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); + dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : // set flag bit + dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); // unset flag bit } diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c index 07f4719a8e..71a506fedf 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c @@ -75,6 +75,7 @@ void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) { void file_hdr_reset(file_hdr_t* target) { target->_rhdr._uflag = 0; + target->_rhdr._serial = 0; target->_rhdr._rid = 0; target->_fro = 0; target->_ts_sec = 0; @@ -85,6 +86,7 @@ void file_hdr_reset(file_hdr_t* target) { int is_file_hdr_reset(file_hdr_t* target) { return target->_rhdr._uflag == 0 && + target->_rhdr._serial == 0 && target->_rhdr._rid == 0 && target->_ts_sec == 0 && target->_ts_nsec == 0 && diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h index 53ca686fb8..2b2a3ffa3f 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h @@ -51,7 +51,7 @@ extern "C"{ * +---+---+---+---+---+---+---+---+ | * | rid | | * +---+---+---+---+---+---+---+---+ -+ - * | fs | partn | reserved | + * | fhs | partn | reserved | * +---+---+---+---+---+---+---+---+ * | data-size | * +---+---+---+---+---+---+---+---+ @@ -70,7 +70,7 @@ extern "C"{ * * ver = Journal version * rid = Record ID - * fs = File header size in sblks (defined by JRNL_SBLK_SIZE) + * fhs = File header size in sblks (defined by JRNL_SBLK_SIZE) * partn = EFP partition from which this file came * fro = First Record Offset * qnl = Length of the queue name in octets. diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index 78e1cc599c..c246837a8d 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -426,8 +426,8 @@ wmgr::abort(data_tok* dtokp, // Delete this txn from tmap, unlock any locked records in emap std::string xid((const char*)xid_ptr, xid_len); - txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found - for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) + txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (!itr->enq_flag_) _emap.unlock(itr->drid_); // ignore rid not found error @@ -525,8 +525,8 @@ wmgr::commit(data_tok* dtokp, // Delete this txn from tmap, process records into emap std::string xid((const char*)xid_ptr, xid_len); - txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found - for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) + txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) // txn enqueue { |
