diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-11-14 20:39:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-11-14 20:39:32 +0000 |
| commit | 32b480e3a6a1173195bc1a322b3a14852ca03068 (patch) | |
| tree | 4f888832787daed51b3e09a57187b101f269e60c /qpid/cpp/src | |
| parent | 20b2cbd6fa546671ab6b3255a1f9f1e7385c60ea (diff) | |
| download | qpid-python-32b480e3a6a1173195bc1a322b3a14852ca03068.tar.gz | |
QPID-4984: Fix for recovery ambiguity issue, other code tidy-ups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1542066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
81 files changed, 1217 insertions, 1929 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp index 67b8b52c6c..7863940534 100644 --- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -115,7 +115,7 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const // For zero value, use default p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); - } else if ( p > 128 || p & (p-1) ) { + } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value if (p < 6) p = 4; else if (p < 12) p = 8; diff --git a/qpid/cpp/src/qpid/linearstore/BindingDbt.h b/qpid/cpp/src/qpid/linearstore/BindingDbt.h index 7297ea7c7c..e5d61de248 100644 --- a/qpid/cpp/src/qpid/linearstore/BindingDbt.h +++ b/qpid/cpp/src/qpid/linearstore/BindingDbt.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_BINDINGDBT_H -#define QPID_LEGACYSTORE_BINDINGDBT_H +#ifndef QPID_LINEARSTORE_BINDINGDBT_H +#define QPID_LINEARSTORE_BINDINGDBT_H #include "db-inc.h" #include "qpid/broker/PersistableExchange.h" @@ -53,4 +53,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_BINDINGDBT_H +#endif // ifndef QPID_LINEARSTORE_BINDINGDBT_H diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.h b/qpid/cpp/src/qpid/linearstore/BufferValue.h index 5af1ac6a43..daeb81306a 100644 --- a/qpid/cpp/src/qpid/linearstore/BufferValue.h +++ b/qpid/cpp/src/qpid/linearstore/BufferValue.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_BUFFERVALUE_H -#define QPID_LEGACYSTORE_BUFFERVALUE_H +#ifndef QPID_LINEARSTORE_BUFFERVALUE_H +#define QPID_LINEARSTORE_BUFFERVALUE_H #include "db-inc.h" #include "qpid/broker/Persistable.h" @@ -43,4 +43,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_BUFFERVALUE_H +#endif // ifndef QPID_LINEARSTORE_BUFFERVALUE_H diff --git a/qpid/cpp/src/qpid/linearstore/Cursor.h b/qpid/cpp/src/qpid/linearstore/Cursor.h index c7e7f34faa..0287803b21 100644 --- a/qpid/cpp/src/qpid/linearstore/Cursor.h +++ b/qpid/cpp/src/qpid/linearstore/Cursor.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_CURSOR_H -#define QPID_LEGACYSTORE_CURSOR_H +#ifndef QPID_LINEARSTORE_CURSOR_H +#define QPID_LINEARSTORE_CURSOR_H #include <boost/shared_ptr.hpp> #include "db-inc.h" @@ -47,4 +47,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_CURSOR_H +#endif // ifndef QPID_LINEARSTORE_CURSOR_H diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h index 17de01d237..154dacc5cb 100644 --- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h +++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H -#define QPID_LEGACYSTORE_DATATOKENIMPL_H +#ifndef QPID_LINEARSTORE_DATATOKENIMPL_H +#define QPID_LINEARSTORE_DATATOKENIMPL_H #include "qpid/linearstore/jrnl/data_tok.h" #include "qpid/broker/PersistableMessage.h" @@ -29,7 +29,7 @@ namespace qpid{ namespace linearstore{ -class DataTokenImpl : public qpid::qls_jrnl::data_tok, public qpid::RefCounted +class DataTokenImpl : public qpid::linearstore::journal::data_tok, public qpid::RefCounted { private: boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg; @@ -44,4 +44,4 @@ class DataTokenImpl : public qpid::qls_jrnl::data_tok, public qpid::RefCounted } // namespace msgstore } // namespace mrg -#endif // ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H +#endif // ifndef QPID_LINEARSTORE_DATATOKENIMPL_H diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 0d15744694..7bbb9dc6a4 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -3,8 +3,8 @@ LinearStore issues: Store: ------ -1. Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start, - no way of discriminating old from new at boundary (used to use OWI). +1. (SOLVED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record + start, no way of discriminating old from new at boundary (used to use OWI). 2. Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first. @@ -12,6 +12,9 @@ Store: 4. Rework qpid management parameters and controls. +5. Consistent logging: rework logging to provide uniform and consistent logging from store (both logging level and + places where logging occurs). + Tests ----- diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.h b/qpid/cpp/src/qpid/linearstore/IdDbt.h index f8bb0647db..c7264491ab 100644 --- a/qpid/cpp/src/qpid/linearstore/IdDbt.h +++ b/qpid/cpp/src/qpid/linearstore/IdDbt.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_IDDBT_H -#define QPID_LEGACYSTORE_IDDBT_H +#ifndef QPID_LINEARSTORE_IDDBT_H +#define QPID_LINEARSTORE_IDDBT_H #include "db-inc.h" @@ -39,4 +39,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_IDDBT_H +#endif // ifndef QPID_LINEARSTORE_IDDBT_H diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.h b/qpid/cpp/src/qpid/linearstore/IdSequence.h index 11b31a2813..17996eec52 100644 --- a/qpid/cpp/src/qpid/linearstore/IdSequence.h +++ b/qpid/cpp/src/qpid/linearstore/IdSequence.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_IDSEQUENCE_H -#define QPID_LEGACYSTORE_IDSEQUENCE_H +#ifndef QPID_LINEARSTORE_IDSEQUENCE_H +#define QPID_LINEARSTORE_IDSEQUENCE_H #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" @@ -40,4 +40,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_IDSEQUENCE_H +#endif // ifndef QPID_LINEARSTORE_IDSEQUENCE_H diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index 4f904665ee..354fca4f7e 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -36,7 +36,7 @@ #include "qmf/org/apache/qpid/linearstore/EventFull.h" #include "qmf/org/apache/qpid/linearstore/EventRecovered.h" -using namespace qpid::qls_jrnl; +using namespace qpid::linearstore::journal; using namespace qpid::linearstore; using qpid::management::ManagementAgent; namespace _qmf = qmf::org::apache::qpid::linearstore; @@ -128,10 +128,10 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, +JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::qls_jrnl::aio_callback* const cbp) + qpid::linearstore::journal::aio_callback* const cbp) { // efpp->createJournal(_jdir); // QLS_LOG2(notice, _jid, "Initialized"); @@ -167,10 +167,10 @@ JournalImpl::recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ - boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, + boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::qls_jrnl::aio_callback* const cbp, + qpid::linearstore::journal::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, uint64_t& highest_rid, uint64_t queue_id) @@ -463,12 +463,12 @@ JournalImpl::handleIoResult(const iores r) writeActivityFlag = true; switch (r) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: + case qpid::linearstore::journal::RHM_IORES_SUCCESS: return; default: { std::ostringstream oss; - oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ")."; + oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ")."; QLS_LOG2(error, _jid, oss.str()); THROW_STORE_FULL_EXCEPTION(oss.str()); } diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index ba62b1d64a..59050e3875 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -41,14 +41,14 @@ namespace qpid{ namespace sys { class Timer; } -namespace qls_jrnl { -class EmptyFilePool; -} namespace linearstore{ class JournalImpl; class JournalLogImpl; +namespace journal { + class EmptyFilePool; +} class InactivityFireEvent : public qpid::sys::TimerTask { @@ -74,7 +74,7 @@ class GetEventsFireEvent : public qpid::sys::TimerTask inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } }; -class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jrnl::jcntl, public qpid::qls_jrnl::aio_callback +class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback { public: typedef boost::function<void (JournalImpl&)> DeleteCallback; @@ -110,26 +110,26 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void initManagement(qpid::management::ManagementAgent* agent); - void initialize(qpid::qls_jrnl::EmptyFilePool* efp, + void initialize(qpid::linearstore::journal::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::qls_jrnl::aio_callback* const cbp); + qpid::linearstore::journal::aio_callback* const cbp); - inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp, + inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks) { initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this); } - void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, + void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::qls_jrnl::aio_callback* const cbp, + qpid::linearstore::journal::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, uint64_t& highest_rid, uint64_t queue_id); - inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, + inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, @@ -142,38 +142,38 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr // Overrides for write inactivity timer void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp, + const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient = false); - void enqueue_extern_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp, + void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient = false); void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp, const std::string& xid, + const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient = false); - void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp, + void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient = false); - void dequeue_data_record(qpid::qls_jrnl::data_tok* const dtokp, const bool txn_coml_commit = false); + void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false); - void dequeue_txn_data_record(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); + void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); - void txn_abort(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid); + void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); - void txn_commit(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid); + void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); void stop(bool block_till_aio_cmpl = false); // Overrides for get_events timer - qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false); + qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false); // TimerTask callback void getEventsFire(); void flushFire(); // AIO callbacks - virtual void wr_aio_cb(std::vector<qpid::qls_jrnl::data_tok*>& dtokl); + virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl); virtual void rd_aio_cb(std::vector<uint16_t>& pil); qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const @@ -194,7 +194,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr timer.add(getEventsFireEventsPtr); getEventsTimerSetFlag = true; } - void handleIoResult(const qpid::qls_jrnl::iores r); + void handleIoResult(const qpid::linearstore::journal::iores r); // Management instrumentation callbacks overridden from jcntl inline void instr_incr_outstanding_aio_cnt() { @@ -222,9 +222,9 @@ class TplJournalImpl : public JournalImpl virtual ~TplJournalImpl() {} // Special version of read_data_record that ignores transactions - needed when reading the TPL - inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize, + inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, - qpid::qls_jrnl::data_tok* const dtokp) { + qpid::linearstore::journal::data_tok* const dtokp) { return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); } }; // class TplJournalImpl diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp index b7935b1509..85e977595d 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp @@ -24,11 +24,11 @@ namespace qpid { namespace linearstore { -JournalLogImpl::JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold) : qpid::qls_jrnl::JournalLog(logLevelThreshold) {} +JournalLogImpl::JournalLogImpl(const qpid::linearstore::journal::JournalLog::log_level_t logLevelThreshold) : qpid::linearstore::journal::JournalLog(logLevelThreshold) {} JournalLogImpl::~JournalLogImpl() {} void -JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level, +JournalLogImpl::log(const qpid::linearstore::journal::JournalLog::log_level_t level, const std::string& log_stmt) const { switch (level) { case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: " << log_stmt); break; @@ -42,7 +42,7 @@ JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level, } void -JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level, +JournalLogImpl::log(const qpid::linearstore::journal::JournalLog::log_level_t level, const std::string& jid, const std::string& log_stmt) const { switch (level) { diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h index 43c2d7dc9c..538b383945 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_LOG_H -#define QPID_LEGACYSTORE_LOG_H +#ifndef QPID_LINEARSTORE_LOG_H +#define QPID_LINEARSTORE_LOG_H #include "qpid/linearstore/jrnl/JournalLog.h" #include "qpid/log/Statement.h" @@ -31,18 +31,18 @@ namespace qpid { namespace linearstore { -class JournalLogImpl : public qpid::qls_jrnl::JournalLog +class JournalLogImpl : public qpid::linearstore::journal::JournalLog { public: - JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold); + JournalLogImpl(const qpid::linearstore::journal::JournalLog::log_level_t logLevelThreshold); virtual ~JournalLogImpl(); - virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel, + virtual void log(const qpid::linearstore::journal::JournalLog::log_level_t logLevel, const std::string& logStatement) const; - virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel, + virtual void log(const qpid::linearstore::journal::JournalLog::log_level_t logLevel, const std::string& journalId, const std::string& logStatement) const; }; }} // namespace qpid::linearstore -#endif // QPID_LEGACYSTORE_LOG_H +#endif // QPID_LINEARSTORE_LOG_H diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 100c8925de..3d06adaa67 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en isInit(false), envPath(envpath_), broker(broker_), - jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE), + jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE), mgmtObject(), agent(0) {} @@ -119,13 +119,13 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) } } -qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_, +qpid::linearstore::journal::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition_, const std::string& /*paramName_*/) { // TODO: check against list of existing partitions, throw if not found return partition_; } -qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_, +qpid::linearstore::journal::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_, const std::string& paramName_) { uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB); if (rem != 0) { @@ -171,8 +171,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_) { // Extract and check options const StoreOptions* opts = static_cast<const StoreOptions*>(options_); - qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); - qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); + qpid::linearstore::journal::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); + qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); @@ -182,8 +182,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_) // These params, taken from options, are assumed to be correct and verified bool MessageStoreImpl::init(const std::string& storeDir_, - qpid::qls_jrnl::efpPartitionNumber_t efpPartition_, - qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_, + qpid::linearstore::journal::efpPartitionNumber_t efpPartition_, + qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_, const bool truncateFlag_, uint32_t wCachePageSizeKib_, uint32_t tplWCachePageSizeKib_) @@ -230,7 +230,7 @@ void MessageStoreImpl::init() } try { - qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir()); + qpid::linearstore::journal::jdir::create_dir(getBdbBaseDir()); dbenv.reset(new DbEnv(0)); dbenv->set_errpfx("linearstore"); @@ -282,7 +282,7 @@ void MessageStoreImpl::init() THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); } catch (const StoreException&) { throw; - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG(error, "Journal Exception occurred while initializing store: " << e); THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); } catch (...) { @@ -291,7 +291,7 @@ void MessageStoreImpl::init() } } while (!isInit); - efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), + efpMgr.reset(new qpid::linearstore::journal::EmptyFilePoolManager(getStoreTopLevelDir(), defaultEfpPartitionNumber, defaultEfpFileSize_kib, jrnlLog)); @@ -337,9 +337,9 @@ void MessageStoreImpl::truncateInit() // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s). - qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); - qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); - qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir()); QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated."); init(); } @@ -349,7 +349,7 @@ void MessageStoreImpl::chkTplStoreInit() // Prevent multiple threads from late-initializing the TPL qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { - qpid::qls_jrnl::jdir::create_dir(getTplBaseDir()); + qpid::linearstore::journal::jdir::create_dir(getTplBaseDir()); tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } @@ -379,7 +379,7 @@ MessageStoreImpl::~MessageStoreImpl() closeDbs(); } catch (const DbException& e) { QLS_LOG(error, "Error closing BDB databases: " << e.what()); - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG(error, "Error: " << e.what()); } catch (const std::exception& e) { QLS_LOG(error, "Error: " << e.what()); @@ -420,7 +420,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); } try { @@ -432,28 +432,28 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, } } -qpid::qls_jrnl::EmptyFilePool* -MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_, - const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) { - qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); +qpid::linearstore::journal::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t efpPartitionNumber_, + const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_) { + qpid::linearstore::journal::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); if (efpp == 0) { std::ostringstream oss; oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB"; - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); } return efpp; } -qpid::qls_jrnl::EmptyFilePool* +qpid::linearstore::journal::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; - qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; + qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; value = args_.get("qpid.efp_partition"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); } - qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; + qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; value = args_.get("qpid.efp_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); @@ -694,7 +694,6 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, txn_list& prepared, message_index& messages) { - QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()"); Cursor queues; queues.open(queueDb, txn.get()); @@ -731,7 +730,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count uint64_t thisHighestRid = 0ULL; - jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); + jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting // from recovery of a store that has had its size changed externally by the resize utility. @@ -757,7 +756,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt); QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt."); jQueue->recover_complete(); // start journal. - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what()); } //read all messages: done on a per queue basis if using Journal @@ -869,7 +868,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, long& rcnt, long& idcnt) { - QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\""); size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); @@ -894,11 +892,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, try { unsigned aio_sleep_cnt = 0; while (read) { - qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); switch (res) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: { + case qpid::linearstore::journal::RHM_IORES_SUCCESS: { msg_count++; qpid::broker::RecoverableMessage::shared_ptr msg; char* data = (char*)dbuff; @@ -948,11 +946,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } } else { // Enqueue and/or dequeue tx - qpid::qls_jrnl::txn_map& tmap = jc->get_txn_map(); - qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map(); + qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found bool enq = false; bool deq = false; - for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { if (j->enq_flag_ && j->rid_ == rid) enq = true; else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } @@ -977,21 +975,21 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, aio_sleep_cnt = 0; break; } - case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT: + case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: if (++aio_sleep_cnt > MAX_AIO_SLEEPS) THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()"); ::usleep(AIO_SLEEP_TIME_US); break; - case qpid::qls_jrnl::RHM_IORES_EMPTY: + case qpid::linearstore::journal::RHM_IORES_EMPTY: read = false; break; // done with all messages. (add call in jrnl to test that _emap is empty.) default: std::ostringstream oss; - oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::qls_jrnl::iores_str(res); + oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::linearstore::journal::iores_str(res); THROW_STORE_EXCEPTION(oss.str()); } // switch } // while - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); } } @@ -1000,7 +998,7 @@ qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage( uint64_t /*messageId*/, unsigned& /*headerSize*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); } int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, @@ -1036,9 +1034,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, void MessageStoreImpl::readTplStore() { - QLS_LOG(info, "*** MessageStoreImpl::readTplStore()"); tplRecoverMap.clear(); - qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map(); + qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map(); DataTokenImpl dtok; void* dbuff = NULL; size_t dbuffSize = 0; void* xidbuff = NULL; size_t xidbuffSize = 0; @@ -1050,9 +1047,9 @@ void MessageStoreImpl::readTplStore() while (!done) { dtok.reset(); dtok.set_wstate(DataTokenImpl::ENQ); - qpid::qls_jrnl::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); switch (res) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: { + case qpid::linearstore::journal::RHM_IORES_SUCCESS: { // Every TPL record contains both data and an XID assert(dbuffSize>0); assert(xidbuffSize>0); @@ -1060,7 +1057,7 @@ void MessageStoreImpl::readTplStore() bool is2PC = *(static_cast<char*>(dbuff)) != 0; // Check transaction details; add to recover map - qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found if (!txnList.empty()) { // xid found in tmap unsigned enqCnt = 0; unsigned deqCnt = 0; @@ -1070,7 +1067,7 @@ void MessageStoreImpl::readTplStore() // Note: will apply to both 1PC and 2PC transactions. bool commitFlag = true; - for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { if (j->enq_flag_) { rid = j->rid_; enqCnt++; @@ -1088,31 +1085,30 @@ void MessageStoreImpl::readTplStore() aio_sleep_cnt = 0; break; } - case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT: + case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: if (++aio_sleep_cnt > MAX_AIO_SLEEPS) THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()"); ::usleep(AIO_SLEEP_TIME_US); break; - case qpid::qls_jrnl::RHM_IORES_EMPTY: + case qpid::linearstore::journal::RHM_IORES_EMPTY: done = true; break; // done with all messages. (add call in jrnl to test that _emap is empty.) default: std::ostringstream oss; - oss << "readTplStore(): Unexpected result from journal read: " << qpid::qls_jrnl::iores_str(res); + oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res); THROW_STORE_EXCEPTION(oss.str()); } // switch } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); } } void MessageStoreImpl::recoverTplStore() { - QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); - if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) { + if (qpid::linearstore::journal::jdir::exists(tplStorePtr->jrnl_dir())) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); + tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1127,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore() void MessageStoreImpl::recoverLockedMappings(txn_list& txns) { - QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()"); if (!tplStorePtr->is_ready()) recoverTplStore(); @@ -1143,7 +1138,6 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { - QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); if (tplStorePtr->is_ready()) { readTplStore(); } else { @@ -1158,18 +1152,18 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage"); } void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy"); } void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/, const std::string& /*data*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); } void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/, @@ -1178,7 +1172,7 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue uint64_t /*offset*/, uint32_t /*length*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); } void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) @@ -1193,7 +1187,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) // TODO: check if this result should be used... /*mrg::journal::iores res =*/ jc->flush(); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); } } @@ -1268,7 +1262,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, } else { THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } @@ -1328,7 +1322,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, } else { jc->dequeue_txn_data_record(ddtokp.get(), tid); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { ddtokp->release(); THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what()); } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index a136f0ba80..fd2ab603d1 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H -#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H +#ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H +#define QPID_LINEARSTORE_MESSAGESTOREIMPL_H #include <iomanip> #include <string> @@ -133,8 +133,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem IdSequence generalIdSequence; IdSequence messageIdSequence; std::string storeDir; - qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber; - qpid::qls_jrnl::efpDataSize_kib_t defaultEfpFileSize_kib; + qpid::linearstore::journal::efpPartitionNumber_t defaultEfpPartitionNumber; + qpid::linearstore::journal::efpDataSize_kib_t defaultEfpFileSize_kib; bool truncateFlag; uint32_t wCachePgSizeSblks; uint16_t wCacheNumPages; @@ -145,7 +145,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem const char* envPath; qpid::broker::Broker* broker; JournalLogImpl jrnlLog; - boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpMgr; + boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpMgr; qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; @@ -156,9 +156,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem const std::string& paramName/*, const uint16_t jrnlFsizePgs*/); static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB); - static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition, + static qpid::linearstore::journal::efpPartitionNumber_t chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition, const std::string& paramName); - static qpid::qls_jrnl::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKiB, + static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB, const std::string& paramName); void init(); @@ -233,8 +233,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); std::string getJrnlDir(const std::string& queueName); - qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpDataSize_kib_t s); - qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args); + qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t p, const qpid::linearstore::journal::efpDataSize_kib_t s); + qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args); std::string getStoreTopLevelDir(); std::string getJrnlBaseDir(); std::string getBdbBaseDir(); @@ -268,8 +268,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const qpid::Options* options); bool init(const std::string& dir, - qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition, - qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib, + qpid::linearstore::journal::efpPartitionNumber_t efpPartition = defEfpPartition, + qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib, const bool truncateFlag = false, uint32_t wCachePageSize = defWCachePageSizeKib, uint32_t tplWCachePageSize = defTplWCachePageSizeKib); @@ -365,4 +365,4 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem } // namespace msgstore } // namespace mrg -#endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H +#endif // ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h index 9888342579..9e401c2a30 100644 --- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h +++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H -#define QPID_LEGACYSTORE_PREPAREDTRANSACTION_H +#ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H +#define QPID_LINEARSTORE_PREPAREDTRANSACTION_H #include <list> #include <map> @@ -72,4 +72,4 @@ struct PreparedTransaction }} -#endif // ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H +#endif // ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h index 9accfd0bd9..5d577d5779 100644 --- a/qpid/cpp/src/qpid/linearstore/StoreException.h +++ b/qpid/cpp/src/qpid/linearstore/StoreException.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H -#define QPID_LEGACYSTORE_STOREEXCEPTION_H +#ifndef QPID_LINEARSTORE_STOREEXCEPTION_H +#define QPID_LINEARSTORE_STOREEXCEPTION_H #include "qpid/linearstore/IdDbt.h" #include <boost/format.hpp> @@ -53,4 +53,4 @@ public: }} -#endif // ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H +#endif // ifndef QPID_LINEARSTORE_STOREEXCEPTION_H diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 022d9c13e2..945f190608 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -52,7 +52,7 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { } else { jc->txn_abort(dtokp.get(), getXid()); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Error commit") + e.what()); } } @@ -104,10 +104,10 @@ void TxnCtxt::sync() { if (preparedXidStorePtr) jrnl_flush(preparedXidStorePtr); for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) - jrnl_sync(static_cast<JournalImpl*>(*i), &qpid::qls_jrnl::jcntl::_aio_cmpl_timeout); + jrnl_sync(static_cast<JournalImpl*>(*i), &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout); if (preparedXidStorePtr) - jrnl_sync(preparedXidStorePtr, &qpid::qls_jrnl::jcntl::_aio_cmpl_timeout); - } catch (const qpid::qls_jrnl::jexception& e) { + jrnl_sync(preparedXidStorePtr, &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout); + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what()); } } @@ -122,7 +122,7 @@ void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) { if (!jc || jc->is_txn_synced(getXid())) return; while (jc->get_wr_aio_evt_rem()) { - if (jc->get_wr_events(timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT && timeout) + if (jc->get_wr_events(timeout) == qpid::linearstore::journal::jerrno::AIO_TIMEOUT && timeout) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()")); } } diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h index 4c009d7a94..5a4350b128 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_TXNCTXT_H -#define QPID_LEGACYSTORE_TXNCTXT_H +#ifndef QPID_LINEARSTORE_TXNCTXT_H +#define QPID_LINEARSTORE_TXNCTXT_H #include "db-inc.h" #include <memory> @@ -112,6 +112,6 @@ class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext }} -#endif // ifndef QPID_LEGACYSTORE_TXNCTXT_H +#endif // ifndef QPID_LINEARSTORE_TXNCTXT_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h index 82d38e716e..7181a43857 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h @@ -19,14 +19,15 @@ * */ -#ifndef QPID_LINEARSTORE_ATOMICCOUNTER_H_ -#define QPID_LINEARSTORE_ATOMICCOUNTER_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_ +#define QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_ #include "qpid/linearstore/jrnl/slock.h" #include <string> namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { template <class T> class AtomicCounter @@ -127,6 +128,6 @@ public: } }; -}} // namespace qpid::qls_jrnl +}}} // namespace qpid::qls_jrnl -#endif // QPID_LINEARSTORE_ATOMICCOUNTER_H_ +#endif // QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp index 015c2cbafe..da3d63f462 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -38,7 +38,8 @@ //#include <iostream> // DEBUG namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, const EmptyFilePoolPartition* partitionPtr, @@ -338,4 +339,4 @@ int EmptyFilePool::moveEmptyFile(const std::string& from, return 0; } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h index cb6887a095..a866d0dea4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_QLS_JRNL_EMPTYFILEPOOL_H_ -#define QPID_QLS_JRNL_EMPTYFILEPOOL_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_ +#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_ namespace qpid { -namespace qls_jrnl { - +namespace linearstore { +namespace journal { class EmptyFilePool; - -}} // namespace qpid::qls_jrnl +}}} #include <deque> #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" @@ -35,7 +34,8 @@ namespace qls_jrnl { #include <string> namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class EmptyFilePoolPartition; class jdir; class JournalFile; @@ -92,6 +92,6 @@ protected: const std::string& toFqPath); }; -}} // namespace qpid::qls_jrnl +}}} -#endif /* QPID_QLS_JRNL_EMPTYFILEPOOL_H_ */ +#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp index 3c2d7d69a2..4b8602318d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp @@ -31,7 +31,8 @@ //#include <iostream> // DEBUG namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath, const efpPartitionNumber_t defaultPartitionNumber, @@ -92,8 +93,8 @@ void EmptyFilePoolManager::findEfpPartitions() { } journalLogRef_.log(JournalLog::LOG_NOTICE, "EFP Manager initialization complete"); - std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList; - std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList; + std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> partitionList; + std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList; getEfpPartitions(partitionList); if (partitionList.size() == 0) { journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible."); @@ -101,14 +102,14 @@ void EmptyFilePoolManager::findEfpPartitions() { std::stringstream oss; oss << "> EFP Partitions found: " << partitionList.size(); journalLogRef_.log(JournalLog::LOG_INFO, oss.str()); - for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) { + for (std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) { filePoolList.clear(); (*i)->getEmptyFilePools(filePoolList); std::stringstream oss; oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size() << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'"; journalLogRef_.log(JournalLog::LOG_INFO, oss.str()); - for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { + for (std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { std::ostringstream oss; oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() << " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB"; @@ -208,4 +209,4 @@ uint16_t EmptyFilePoolManager::getNumEfpPartitions() const { return partitionMap_.size(); } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h index d5fab82800..1b1f293a1c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h @@ -27,7 +27,8 @@ #include "qpid/linearstore/jrnl/smutex.h" namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class EmptyFilePoolManager { @@ -66,6 +67,6 @@ public: uint16_t getNumEfpPartitions() const; }; -}} // namespace qpid::qls_jrnl +}}} #endif /* QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp index d38db60975..2d84ba4296 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp @@ -31,7 +31,8 @@ //#include <iostream> // DEBUG namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { // static const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition @@ -151,4 +152,4 @@ void EmptyFilePoolPartition::validatePartitionDir() { // TODO: other validity checks here } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h index a9ca36d07b..9a2b5a5d75 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ -#define QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ +#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ namespace qpid { -namespace qls_jrnl { - +namespace linearstore { +namespace journal { class EmptyFilePoolPartition; - -}} // namespace qpid::qls_jrnl +}}} #include "qpid/linearstore/jrnl/EmptyFilePool.h" #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" @@ -37,7 +36,8 @@ namespace qls_jrnl { #include <vector> namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class JournalLog; class EmptyFilePoolPartition @@ -75,6 +75,6 @@ protected: void validatePartitionDir(); }; -}} // namespace qpid::qls_jrnl +}}} -#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ */ +#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h index 2f9693fd95..d8e8225697 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h @@ -19,15 +19,16 @@ * */ -#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ -#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_ +#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_ #include <iostream> #include <stdint.h> #include <utility> // std::pair namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib @@ -41,9 +42,10 @@ typedef struct efpIdentity_t { efpDataSize_kib_t ds_; efpIdentity_t() : pn_(0), ds_(0) {} efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {} + efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {} friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; } } efpIdentity_t; -}} // namespace qpid::qls_jrnl +}}} -#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ */ +#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp index 3de7b6c1e2..1e6c10eea7 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -31,18 +31,23 @@ //#include <iostream> // DEBUG namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { JournalFile::JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader) : + const efpIdentity_t& efpIdentity, + const uint64_t fileSeqNum) : + efpIdentity_(efpIdentity), fqFileName_(fqFileName), - fileSeqNum_(fileHeader._file_number), + fileSeqNum_(fileSeqNum), + serial_(getRandom64()), + firstRecordOffset_(0ULL), fileHandle_(-1), fileCloseFlag_(false), fileHeaderBasePtr_ (0), fileHeaderPtr_(0), aioControlBlockPtr_(0), - fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -50,16 +55,18 @@ JournalFile::JournalFile(const std::string& fqFileName, {} JournalFile::JournalFile(const std::string& fqFileName, - const uint64_t fileSeqNum, - const efpDataSize_kib_t efpDataSize_kib) : + const ::file_hdr_t& fileHeader) : + efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib), fqFileName_(fqFileName), - fileSeqNum_(fileSeqNum), + fileSeqNum_(fileHeader._file_number), + serial_(fileHeader._rhdr._serial), + firstRecordOffset_(fileHeader._fro), fileHandle_(-1), fileCloseFlag_(false), fileHeaderBasePtr_ (0), fileHeaderPtr_(0), aioControlBlockPtr_(0), - fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -108,6 +115,10 @@ uint64_t JournalFile::getFileSeqNum() const { return fileSeqNum_; } +uint64_t JournalFile::getSerial() const { + return serial_; +} + int JournalFile::open() { fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- if (fileHandle_ < 0) { @@ -141,10 +152,12 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, const uint64_t recordId, const uint64_t firstRecordOffset, const std::string queueName) { + firstRecordOffset_ = firstRecordOffset; ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); ::file_hdr_init(fileHeaderBasePtr_, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, userFlags, + serial_, recordId, firstRecordOffset, fileSeqNum_, @@ -208,6 +221,18 @@ uint16_t JournalFile::decrOutstandingAioOperationCount() { return r; } +efpIdentity_t JournalFile::getEfpIdentity() const { + return efpIdentity_; +} + +uint64_t JournalFile::getFirstRecordOffset() const { + return firstRecordOffset_; +} + +void JournalFile::setFirstRecordOffset(const uint64_t firstRecordOffset) { + firstRecordOffset_ = firstRecordOffset; +} + // --- Status helper functions --- bool JournalFile::isEmpty() const { @@ -253,6 +278,22 @@ const std::string JournalFile::getFileName() const { return fqFileName_.substr(fqFileName_.rfind('/')+1); } +//static +uint64_t JournalFile::getRandom64() { + int randomData = ::open("/dev/random", O_RDONLY); + if (randomData < 0) { + throw jexception(); // TODO: Complete exception details + } + uint64_t randomNumber; + ::size_t size = sizeof(randomNumber); + ::ssize_t result = ::read(randomData, (char*)&randomNumber, size); + if (result < 0 || result != ssize_t(size)) { + throw jexception(); // TODO: Complete exception details + } + ::close(randomData); + return randomNumber; +} + bool JournalFile::isOpen() const { return fileHandle_ >= 0; } @@ -298,4 +339,4 @@ bool JournalFile::isFullAndComplete() const { } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h index 0abbc7bcca..687b2c4866 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LINEARSTORE_JOURNALFILE_H_ -#define QPID_LINEARSTORE_JOURNALFILE_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_ +#define QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_ #include "qpid/linearstore/jrnl/aio.h" #include "qpid/linearstore/jrnl/AtomicCounter.h" @@ -31,13 +31,17 @@ class file_hdr_t; namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class JournalFile { protected: + const efpIdentity_t efpIdentity_; const std::string fqFileName_; const uint64_t fileSeqNum_; + const uint64_t serial_; + uint64_t firstRecordOffset_; int fileHandle_; bool fileCloseFlag_; void* fileHeaderBasePtr_; @@ -51,11 +55,13 @@ protected: AtomicCounter<uint16_t> outstandingAioOpsCount_; ///< Outstanding AIO operations on this file public: + // Constructor for creating new file with known fileSeqNum and random serial JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader); + const efpIdentity_t& efpIdentity, + const uint64_t fileSeqNum); + // Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param JournalFile(const std::string& fqFileName, - const uint64_t fileSeqNum, - const efpDataSize_kib_t efpDataSize_kib); + const ::file_hdr_t& fileHeader); virtual ~JournalFile(); void initialize(const uint32_t completedDblkCount); @@ -63,6 +69,7 @@ public: const std::string getFqFileName() const; uint64_t getFileSeqNum() const; + uint64_t getSerial() const; int open(); void close(); @@ -87,6 +94,10 @@ public: uint16_t getOutstandingAioOperationCount() const; uint16_t decrOutstandingAioOperationCount(); + efpIdentity_t getEfpIdentity() const; + uint64_t getFirstRecordOffset() const; + void setFirstRecordOffset(const uint64_t firstRecordOffset); + // Status helper functions bool isEmpty() const; ///< True if no writes of any kind have occurred bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued @@ -97,6 +108,7 @@ public: protected: const std::string getDirectory() const; const std::string getFileName() const; + static uint64_t getRandom64(); bool isOpen() const; uint32_t getSubmittedDblkCount() const; @@ -114,6 +126,6 @@ protected: bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO }; -}} // namespace qpid::qls_jrnl +}}} -#endif // QPID_LINEARSTORE_JOURNALFILE_H_ +#endif // QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp index 15222e3a47..96a6432624 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp @@ -23,7 +23,8 @@ #include <iostream> namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { JournalLog::JournalLog(log_level_t logLevelThreshold) : logLevelThreshold_(logLevelThreshold) {} @@ -58,4 +59,4 @@ const char* JournalLog::log_level_str(log_level_t logLevel) { return "<log level unknown>"; } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h index b5fcdbc256..cf503cb9d2 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h @@ -19,13 +19,14 @@ * */ -#ifndef QPID_LINEARSTORE_JOURNALLOG_H_ -#define QPID_LINEARSTORE_JOURNALLOG_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_ +#define QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_ #include <string> namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class JournalLog { @@ -54,6 +55,6 @@ public: static const char* log_level_str(const log_level_t logLevel); }; -}} // namespace qpid::qls_jrnl +}}} -#endif // QPID_LINEARSTORE_JOURNALLOG_H_ +#endif // QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp index 2c1327d645..9daebaba80 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp @@ -32,7 +32,8 @@ //#include <iostream> // DEBUG namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), @@ -90,7 +91,7 @@ void LinearFileController::pullEmptyFileFromEfp() { currentJournalFilePtr_->close(); std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name //std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG - addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0); + addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); } void LinearFileController::purgeEmptyFilesToEfp() { @@ -149,6 +150,11 @@ uint64_t LinearFileController::getCurrentFileSeqNum() const { return currentJournalFilePtr_->getFileSeqNum(); } +uint64_t LinearFileController::getCurrentSerial() const { + assertCurrentJournalFileValid("getCurrentSerial"); + return currentJournalFilePtr_->getSerial(); +} + bool LinearFileController::isEmpty() const { assertCurrentJournalFileValid("isEmpty"); return currentJournalFilePtr_->isEmpty(); @@ -173,10 +179,10 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const // --- protected functions --- void LinearFileController::addJournalFile(const std::string& fileName, + const efpIdentity_t& efpIdentity, const uint64_t fileNumber, - const uint32_t fileSize_kib, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, fileNumber, fileSize_kib); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber); addJournalFile(jfp, completedDblkCount); } @@ -219,4 +225,4 @@ void LinearFileController::purgeEmptyFilesToEfpNoLock() { } } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h index 83d360dad8..1e8d9c9200 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ -#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_ +#define QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_ #include <deque> #include "qpid/linearstore/jrnl/AtomicCounter.h" @@ -31,7 +31,8 @@ typedef struct io_context* io_context_t; typedef struct iocb aio_cb; namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class EmptyFilePool; class jcntl; @@ -90,6 +91,7 @@ public: uint32_t dataSize_dblks); uint64_t getCurrentFileSeqNum() const; + uint64_t getCurrentSerial() const; bool isEmpty() const; // Debug aid @@ -97,8 +99,8 @@ public: protected: void addJournalFile(const std::string& fileName, + const efpIdentity_t& efpIdentity, const uint64_t fileNumber, - const uint32_t fileSize_kib, const uint32_t completedDblkCount); void assertCurrentJournalFileValid(const char* const functionName) const; bool checkCurrentJournalFileValid() const; @@ -110,6 +112,6 @@ protected: typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr, const uint32_t completedDblkCount); -}} // namespace qpid::qls_jrnl +}}} -#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ +#endif // QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp index cba560750b..361ee1aeda 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp @@ -44,8 +44,8 @@ #include <vector> namespace qpid { -namespace qls_jrnl -{ +namespace linearstore { +namespace journal { RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, @@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), + currentSerial_(0), efpFileSize_kib_(0) {} @@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } enq_map::emap_data_struct_t eds; enqueueMapRef_.get_data(*recordIdListConstItr_, eds); - uint64_t fileNumber = eds._pfid; - currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); - getNextFile(false); + if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) { + getFile(eds._pfid, false); + } +//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG inFileStream_.seekg(eds._file_posn, std::ifstream::beg); if (!inFileStream_.good()) { @@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, // check flags transient = ::is_enq_transient(&enqueueHeader); external = ::is_enq_external(&enqueueHeader); - +//char magicBuff[5]; // DEBUG +//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG +//magicBuff[4] = 0; // DEBUG +//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG // read xid xidSize = enqueueHeader._xidsize; *xidPtrPtr = ::malloc(xidSize); @@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, } } -std::string RecoveryManager::toString(const std::string& jid, - bool compact) { +std::string RecoveryManager::toString(const std::string& jid) { std::ostringstream oss; - if (compact) { - oss << "Recovery journal analysis (jid=\"" << jid << "\"):"; - oss << " jfl=["; - for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) { - if (i!=fileNumberMap_.begin()) { - oss << " "; - } - std::string fqFileName = i->second->getFqFileName(); - oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1); - } - oss << "] ecl=[ "; - for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) { - if (j!=fileNumberMap_.begin()) { - oss << " "; - } - oss << j->second->getEnqueuedRecordCount(); + oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; + oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; + oss << " Journal File List:" << std::endl; + for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { + std::string fqFileName = k->second->getFqFileName(); + oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; + } + oss << " Enqueue Counts: [ "; + for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { + if (l != fileNumberMap_.begin()) { + oss << ", "; } - oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F"); - oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; - oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; - oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec; - oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec; - oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F"); + oss << l->second->getEnqueuedRecordCount(); + } + oss << " ]" << std::endl; + oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ << + std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" << + (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; + oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl; + oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << " Enqueued records (txn & non-txn):" << std::endl; + return oss.str(); +} + +std::string RecoveryManager::toLog(const std::string& jid, const int indent) { + std::string indentStr(indent, ' '); + std::ostringstream oss; + oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl; + if (journalEmptyFlag_) { + oss << indentStr << "<Journal empty, no journal files found>" << std::endl; } else { - oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; - oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; - oss << " Journal File List:" << std::endl; + oss << indentStr << std::setw(7) << "file_id" + << std::setw(43) << "file_name" + << std::setw(16) << "fro" + << std::setw(12) << "record_cnt" + << std::setw(5) << "ptn" + << std::setw(10) << "efp" + << std::endl; + oss << indentStr << std::setw(7) << "-------" + << std::setw(43) << "-----------------------------------------" + << std::setw(16) << "--------------" + << std::setw(12) << "----------" + << std::setw(5) << "---" + << std::setw(10) << "--------" + << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { std::string fqFileName = k->second->getFqFileName(); - oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; - } - oss << " Enqueue Counts: [ " << std::endl; - for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { - if (l != fileNumberMap_.begin()) { - oss << ", "; - } - oss << l->second->getEnqueuedRecordCount(); + std::ostringstream fro; + fro << std::hex << "0x" << k->second->getFirstRecordOffset(); + oss << indentStr << std::setw(7) << k->first + << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1) + << std::setw(16) << fro.str() + << std::setw(12) << k->second->getEnqueuedRecordCount() + << std::setw(5) << k->second->getEfpIdentity().pn_ + << std::setw(9) << k->second->getEfpIdentity().ds_ << "k" + << std::endl; } - oss << " ]" << std::endl; - oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ << + oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; - oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" << + oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; - oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; - oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl; - oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << " Enqueued records (txn & non-txn):" << std::endl; + oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; + oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << indentStr << "Enqueued records (txn & non-txn):"; } return oss.str(); } @@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) } bool RecoveryManager::decodeRecord(jrec& record, - std::size_t& cumulativeSizeRead, - ::rec_hdr_t& headerRecord, - std::streampos& fileOffset) + std::size_t& cumulativeSizeRead, + ::rec_hdr_t& headerRecord, + std::streampos& fileOffset) { std::streampos start_file_offs = fileOffset; @@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec& record, bool done = false; while (!done) { try { - done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead); + done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); } catch (const jexception& e) { + journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); checkJournalAlignment(start_file_offs); return false; } - if (!done && !getNextFile(false)) { - checkJournalAlignment(start_file_offs); - return false; + if (!done && needNextFile()) { + if (!getNextFile(false)) { + checkJournalAlignment(start_file_offs); + return false; + } } } return true; @@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFileNumber() const { return currentJournalFileConstItr_->first; } -bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { +bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { - if (inFileStream_.eof() || !inFileStream_.good()) { - inFileStream_.clear(); - endOffset_ = inFileStream_.tellg(); // remember file offset before closing - if (endOffset_ == -1) { - std::ostringstream oss; - oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); - throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile"); - } - inFileStream_.close(); - if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { - return false; - } - } - } - if (!inFileStream_.is_open()) { + inFileStream_.close(); +//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ - inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); - if (!inFileStream_.good()) { - throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile"); - } + } + currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); + if (currentJournalFileConstItr_ == fileNumberMap_.end()) { + return false; + } + inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); + if (!inFileStream_.good()) { + throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile"); + } +//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG - // Read file header - file_hdr_t fhdr; - inFileStream_.read((char*)&fhdr, sizeof(fhdr)); - checkFileStreamOk(true); - if (fhdr._rhdr._magic == QLS_FILE_MAGIC) { - firstRecordOffset_ = fhdr._fro; - std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES; - inFileStream_.seekg(foffs); - } else { - inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { - journalEmptyFlag_ = true; - } + if (!readFileHeader()) { + return false; + } + std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + inFileStream_.seekg(foffs); + return true; +} + +bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { + if (inFileStream_.is_open()) { + inFileStream_.close(); +//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG + if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { return false; } + inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ + } + inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); + if (!inFileStream_.good()) { + throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile"); + } +//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG + + if (!readFileHeader()) { + return false; } + std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + inFileStream_.seekg(foffs); return true; } @@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeader() bool hdr_ok = false; std::streampos file_pos; while (!hdr_ok) { - if (!inFileStream_.is_open()) { + if (needNextFile()) { if (!getNextFile(true)) { return false; } @@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeader() if (inFileStream_.gcount() == sizeof(rec_hdr_t)) { hdr_ok = true; } else { - if (!getNextFile(true)) { - return false; + if (needNextFile()) { + if (!getNextFile(true)) { + return false; + } } } } @@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_ENQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } enq_rec er; uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(er, cum_size_read, h, file_pos)) { @@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_DEQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } deq_rec dr; uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(dr, cum_size_read, h, file_pos)) { @@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_TXA_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } txn_rec ar; if (!decodeRecord(ar, cum_size_read, h, file_pos)) { return false; @@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_TXC_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } txn_rec cr; if (!decodeRecord(cr, cum_size_read, h, file_pos)) { return false; @@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeader() uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t)); inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t)); checkFileStreamOk(false); - if (!getNextFile(false)) { - return false; + if (needNextFile()) { + if (!getNextFile(false)) { + return false; + } } } break; @@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeader() return true; } +bool RecoveryManager::needNextFile() { + if (inFileStream_.is_open()) { + return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024); + } + return true; +} + void RecoveryManager::readJournalData(char* target, const std::streamsize readSize) { std::streamoff bytesRead = 0; @@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(char* target, inFileStream_.read(target + bytesRead, readSize - bytesRead); std::streamoff thisReadSize = inFileStream_.gcount(); if (thisReadSize < readSize) { - getNextFile(false); + if (needNextFile()) { + getNextFile(false); + } file_pos = inFileStream_.tellg(); if (file_pos == std::streampos(-1)) { std::ostringstream oss; @@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(char* target, } } +bool RecoveryManager::readFileHeader() { + file_hdr_t fhdr; + inFileStream_.read((char*)&fhdr, sizeof(fhdr)); + checkFileStreamOk(true); + if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) { + firstRecordOffset_ = fhdr._fro; + currentSerial_ = fhdr._rhdr._serial; + } else { + inFileStream_.close(); + if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { + journalEmptyFlag_ = true; + } + return false; + } + return true; +} + // static private void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, ::file_hdr_t& fileHeaderRef, @@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { } } -}} // namespace qpid::qls_jrnl +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h index 3cacd7cfb3..7655eb6831 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_ -#define QPID_LINEARSTORE_RECOVERYSTATE_H_ +#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_ +#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_ #include <deque> #include <fstream> @@ -33,7 +33,8 @@ struct file_hdr_t; struct rec_hdr_t; namespace qpid { -namespace qls_jrnl { +namespace linearstore { +namespace journal { class data_tok; class enq_map; @@ -72,6 +73,7 @@ protected: bool lastFileFullFlag_; ///< Last file is full // State for recovery of individual enqueued records + uint64_t currentSerial_; uint32_t efpFileSize_kib_; fileNumberMapConstItr_t currentJournalFileConstItr_; std::string currentFileName_; @@ -104,8 +106,8 @@ public: bool ignore_pending_txns); void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr); - std::string toString(const std::string& jid, - bool compact = true); + std::string toString(const std::string& jid); + std::string toLog(const std::string& jid, const int indent); protected: void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity); void checkFileStreamOk(bool checkEof); @@ -116,8 +118,11 @@ protected: std::streampos& fileOffset); std::string getCurrentFileName() const; uint64_t getCurrentFileNumber() const; + bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag); bool getNextFile(bool jumpToFirstRecordOffsetFlag); bool getNextRecordHeader(); + bool needNextFile(); + bool readFileHeader(); void readJournalData(char* target, const std::streamsize size); void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr); @@ -126,6 +131,6 @@ protected: std::string& queueName); }; -}} // namespace qpid::qls_jrnl +}}} -#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_ +#endif // QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/aio.h b/qpid/cpp/src/qpid/linearstore/jrnl/aio.h index 7be5b83930..3a4a762439 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/aio.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/aio.h @@ -19,17 +19,16 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_AIO_H -#define QPID_LEGACYSTORE_JRNL_AIO_H +#ifndef QPID_LINEARSTORE_JOURNAL_AIO_H +#define QPID_LINEARSTORE_JOURNAL_AIO_H #include <libaio.h> #include <cstring> #include <string.h> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { typedef iocb aio_cb; typedef io_event aio_event; @@ -135,6 +134,6 @@ public: } }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h b/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h index 975d114f1f..f21b62617b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h @@ -19,27 +19,26 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H -#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H +#ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H +#define QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H #include <stdint.h> #include <vector> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { - class data_tok; +class data_tok; - class aio_callback - { - public: - virtual ~aio_callback() {} - virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0; - virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0; - }; +class aio_callback +{ +public: + virtual ~aio_callback() {} + virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0; + virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0; +}; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h b/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h index c22dcb8c1c..243c817946 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H -#define QPID_LEGACYSTORE_JRNL_CVAR_H +#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H +#define QPID_LINEARSTORE_JOURNAL_CVAR_H #include <cstring> #include "qpid/linearstore/jrnl/jerrno.h" @@ -30,10 +30,9 @@ #include <pthread.h> #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // Ultra-simple thread condition variable class class cvar @@ -71,6 +70,6 @@ namespace qls_jrnl } }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp index ab2a763f17..a07a89b02a 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp @@ -27,10 +27,9 @@ #include "qpid/linearstore/jrnl/slock.h" #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // Static members @@ -39,10 +38,8 @@ smutex data_tok::_mutex; data_tok::data_tok(): _wstate(NONE), -// _rstate(UNREAD), _dsize(0), _dblks_written(0), -// _dblks_read(0), _pg_cnt(0), _fid(0), _rid(0), @@ -106,58 +103,12 @@ data_tok::wstate_str(write_state wstate) return "<wstate unknown>"; } -/* -const char* -data_tok::rstate_str() const -{ - return rstate_str(_rstate); -} -*/ - -/* -const char* -data_tok::rstate_str(read_state rstate) -{ - switch (rstate) - { - case NONE: - return "NONE"; - case READ_PART: - return "READ_PART"; - case SKIP_PART: - return "SKIP_PART"; - case READ: - return "READ"; - // Not using default: forces compiler to ensure all cases are covered. - } - return "<rstate unknown>"; -} -*/ - -/* -void -data_tok::set_rstate(const read_state rstate) -{ - if (_wstate != ENQ && rstate != UNREAD) - { - std::ostringstream oss; - oss << "Attempted to change read state to " << rstate_str(rstate); - oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << "."; - throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok", - "set_rstate"); - } - _rstate = rstate; -} -*/ - void data_tok::reset() { _wstate = NONE; -// _rstate = UNREAD; _dsize = 0; _dblks_written = 0; -// _dblks_read = 0; _pg_cnt = 0; _fid = 0; _rid = 0; @@ -185,4 +136,4 @@ data_tok::status_str() const return oss.str(); } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h index 39dc1c4a81..f727243cb0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H -#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H +#ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H +#define QPID_LINEARSTORE_JOURNAL_DATA_TOK_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class data_tok; -}} +}}} #include <cassert> #include <cstddef> @@ -35,10 +34,9 @@ class data_tok; #include <pthread.h> #include <string> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { /** * \class data_tok @@ -72,25 +70,13 @@ namespace qls_jrnl COMMITTED }; -/* - enum read_state - { - UNREAD, ///< Data block not read - READ_PART, ///< Data block is part-read; waiting for page buffer to fill - SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill - READ ///< Data block is fully read - }; -*/ - protected: static smutex _mutex; static uint64_t _cnt; uint64_t _icnt; write_state _wstate; ///< Enqueued / dequeued state of data -// read_state _rstate; ///< Read state of data std::size_t _dsize; ///< Data size in bytes uint32_t _dblks_written; ///< Data blocks read/written -// uint32_t _dblks_read; ///< Data blocks read/written uint32_t _pg_cnt; ///< Page counter - incr for each page containing part of data uint64_t _fid; ///< FID containing header of enqueue record uint64_t _rid; ///< RID of data set by enqueue operation @@ -106,16 +92,11 @@ namespace qls_jrnl inline write_state wstate() const { return _wstate; } const char* wstate_str() const; static const char* wstate_str(write_state wstate); -// inline read_state rstate() const { return _rstate; } -// const char* rstate_str() const; -// static const char* rstate_str(read_state rstate); inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; } inline bool is_enqueued() const { return _wstate == ENQ; } inline bool is_readable() const { return _wstate == ENQ; } -// inline bool is_read() const { return _rstate == READ; } inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; } inline void set_wstate(const write_state wstate) { _wstate = wstate; } -// void set_rstate(const read_state rstate); inline std::size_t dsize() const { return _dsize; } inline void set_dsize(std::size_t dsize) { _dsize = dsize; } @@ -124,10 +105,6 @@ namespace qls_jrnl { _dblks_written += dblks_written; } inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; } -// inline uint32_t dblocks_read() const { return _dblks_read; } -// inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; } -// inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; } - inline uint32_t pg_cnt() const { return _pg_cnt; } inline uint32_t incr_pg_cnt() { return ++_pg_cnt; } inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; } @@ -154,7 +131,6 @@ namespace qls_jrnl std::string status_str() const; }; -} // namespace qls_jrnl -} // namespace jrnl +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp index 5b251b7908..ca8de61607 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp @@ -19,87 +19,43 @@ * */ -/** - * \file deq_rec.cpp - * - * Qpid asynchronous store plugin library - * - * This file contains the code for the mrg::journal::deq_rec (journal dequeue - * record) class. See comments in file deq_rec.h for details. - * - * \author Kim van der Riet - */ - #include "qpid/linearstore/jrnl/deq_rec.h" -#include "qpid/linearstore/jrnl/utils/deq_hdr.h" -#include "qpid/linearstore/jrnl/utils/rec_tail.h" #include <cassert> -#include <cerrno> -#include <cstdlib> #include <cstring> #include <iomanip> -#include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" -#include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { deq_rec::deq_rec(): -// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false), _xidp(0), _buff(0) -// _deq_tail(_deq_hdr) { - ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0); + ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0); ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0); } -deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp, - const std::size_t xidlen, const bool txn_coml_commit): -// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit), - _xidp(xidp), - _buff(0) -// _deq_tail(_deq_hdr) -{ - ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen); - ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0); - ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit); -} - deq_rec::~deq_rec() { clean(); } void -deq_rec::reset() -{ - _deq_hdr._rhdr._rid = 0; -// _deq_hdr.set_owi(false); - ::set_txn_coml_commit(&_deq_hdr, false); - _deq_hdr._deq_rid = 0; - _deq_hdr._xidsize = 0; - _deq_tail._checksum = 0; - _deq_tail._rid = 0; - _xidp = 0; - _buff = 0; -} - -void -deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp, - const std::size_t xidlen, const bool txn_coml_commit) +deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, + const std::size_t xidlen, const bool txn_coml_commit) { + _deq_hdr._rhdr._serial = serial; _deq_hdr._rhdr._rid = rid; ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit); _deq_hdr._deq_rid = drid; _deq_hdr._xidsize = xidlen; - _deq_tail._rid = rid; _xidp = xidp; _buff = 0; + _deq_tail._serial = serial; + _deq_tail._rid = rid; } uint32_t @@ -214,132 +170,16 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) return size_dblks(wr_cnt); } -uint32_t -deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) -{ - assert(rptr != 0); - assert(max_size_dblks > 0); - - std::size_t rd_cnt = 0; - if (rec_offs_dblks) // Continuation of record on new page - { - const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize); - const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize + - sizeof(rec_tail_t)); - const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; - - if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of xid fits within this page - if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize) - { - // Part of xid still outstanding, copy remainder of xid and tail - const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t); - const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs; - std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); - rd_cnt = xid_rem; - std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail)); - chk_tail(); - rd_cnt += sizeof(_deq_tail); - } - else - { - // Tail or part of tail only outstanding, complete tail - const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize; - const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs; - std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem); - chk_tail(); - rd_cnt = tail_rem; - } - } - else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of xid fits within this page, tail split - const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t); - const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs; - std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); - rd_cnt += xid_rem; - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem); - rd_cnt += tail_rem; - } - } - else - { - // Remainder of xid split - const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES); - std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size); - rd_cnt += xid_cp_size; - } - } - else // Start of record - { - // Get and check header - //_deq_hdr.hdr_copy(h); - ::rec_hdr_copy(&_deq_hdr._rhdr, &h); - rd_cnt = sizeof(rec_hdr_t); - _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt); - rd_cnt += sizeof(uint64_t); - _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt); - rd_cnt = sizeof(deq_hdr_t); - chk_hdr(); - if (_deq_hdr._xidsize) - { - _buff = std::malloc(_deq_hdr._xidsize); - MALLOC_CHK(_buff, "_buff", "deq_rec", "decode"); - const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize); - const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize + - sizeof(rec_tail_t)); - - // Check if record (header + xid + tail) fits within this page, we can check the - // tail before the expense of copying data to memory - if (hdr_xid_tail_dblks <= max_size_dblks) - { - // Entire header, xid and tail fits within this page - std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize); - rd_cnt += _deq_hdr._xidsize; - std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail)); - rd_cnt += sizeof(_deq_tail); - chk_tail(); - } - else if (hdr_xid_dblks <= max_size_dblks) - { - // Entire header and xid fit within this page, tail split - std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize); - rd_cnt += _deq_hdr._xidsize; - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem); - rd_cnt += tail_rem; - } - } - else - { - // Header fits within this page, xid split - const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size); - rd_cnt += xid_cp_size; - } - } - } - return size_dblks(rd_cnt); -} - bool -deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) +deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { + uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { //_deq_hdr.hdr_copy(h); ::rec_hdr_copy(&_deq_hdr._rhdr, &h); - ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t)); - ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t)); -#if defined(JRNL_32_BIT) - ifsp->ignore(sizeof(uint32_t)); // _filler0 -#endif + ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid)); + ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize)); rec_offs = sizeof(_deq_hdr); // Read header, allocate (if req'd) for xid if (_deq_hdr._xidsize) @@ -382,9 +222,21 @@ deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) } } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); - if (_deq_hdr._xidsize) - chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); + if (_deq_hdr._xidsize) { + int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum); + if (res != 0) { + std::stringstream oss; + switch (res) { + case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break; + case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break; + case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break; + case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break; + default: oss << "Unknown error " << res; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info + } + } return true; } @@ -427,38 +279,9 @@ deq_rec::rec_size() const } void -deq_rec::chk_hdr() const -{ - jrec::chk_hdr(_deq_hdr._rhdr); - if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid; - oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC; - oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr"); - } -} - -void -deq_rec::chk_hdr(uint64_t rid) const -{ - chk_hdr(); - jrec::chk_rid(_deq_hdr._rhdr, rid); -} - -void -deq_rec::chk_tail() const -{ - jrec::chk_tail(_deq_tail, _deq_hdr._rhdr); -} - -void deq_rec::clean() { // clean up allocated memory here } -} // namespace journal -} // namespace mrg +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h index 12ba6af855..b4941895be 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h @@ -19,73 +19,51 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H -#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H +#ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H +#define QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H -namespace qpid -{ -namespace qls_jrnl -{ -class deq_rec; -}} - -#include <cstddef> -#include "qpid/linearstore/jrnl/utils/deq_hdr.h" #include "qpid/linearstore/jrnl/jrec.h" +#include "qpid/linearstore/jrnl/utils/deq_hdr.h" +#include "qpid/linearstore/jrnl/utils/rec_tail.h" -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { - /** - * \class deq_rec - * \brief Class to handle a single journal dequeue record. - */ - class deq_rec : public jrec - { - private: - deq_hdr_t _deq_hdr; ///< Dequeue header - const void* _xidp; ///< xid pointer for encoding (writing to disk) - void* _buff; ///< Pointer to buffer to receive data read from disk - rec_tail_t _deq_tail; ///< Record tail, only encoded if XID is present +/** +* \class deq_rec +* \brief Class to handle a single journal dequeue record. +*/ +class deq_rec : public jrec +{ +private: + ::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct + const void* _xidp; ///< xid pointer for encoding (writing to disk) + void* _buff; ///< Pointer to buffer to receive data read from disk + ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present - public: - // constructor used for read operations and xid will have memory allocated - deq_rec(); - // constructor used for write operations, where xid already exists - deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp, - const std::size_t xidlen, const bool txn_coml_commit); - virtual ~deq_rec(); +public: + deq_rec(); + virtual ~deq_rec(); - // Prepare instance for use in reading data from journal - void reset(); - // Prepare instance for use in writing data to journal - void reset(const uint64_t rid, const uint64_t drid, const void* const xidp, - const std::size_t xidlen, const bool txn_coml_commit); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); - uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, - uint32_t max_size_dblks); - // Decode used for recover - bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs); + void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, + const std::size_t xidlen, const bool txn_coml_commit); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); - inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } - inline uint64_t rid() const { return _deq_hdr._rhdr._rid; } - inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; } - std::size_t get_xid(void** const xidpp); - std::string& str(std::string& str) const; - inline std::size_t data_size() const { return 0; } // This record never carries data - std::size_t xid_size() const; - std::size_t rec_size() const; + inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } + inline uint64_t rid() const { return _deq_hdr._rhdr._rid; } + inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; } + std::size_t get_xid(void** const xidpp); + std::string& str(std::string& str) const; + inline std::size_t data_size() const { return 0; } // This record never carries data + std::size_t xid_size() const; + std::size_t rec_size() const; - private: - virtual void chk_hdr() const; - virtual void chk_hdr(uint64_t rid) const; - virtual void chk_tail() const; - virtual void clean(); - }; // class deq_rec +private: + virtual void clean(); +}; -} // namespace journal -} // namespace mrg +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp index 3bc49502c8..010968bdf7 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp @@ -27,10 +27,9 @@ #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // static return/error codes int16_t enq_map::EMAP_DUP_RID = -3; @@ -183,4 +182,4 @@ enq_map::pfid_list(std::vector<uint64_t>& fv) } } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h index 659e3bd052..aff839a022 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H -#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H +#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H +#define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class enq_map; -}} +}}} #include "qpid/linearstore/jrnl/jexception.h" #include "qpid/linearstore/jrnl/smutex.h" @@ -35,10 +34,9 @@ class enq_map; #include <pthread.h> #include <vector> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { /** * \class enq_map @@ -107,6 +105,6 @@ namespace qls_jrnl void pfid_list(std::vector<uint64_t>& fv); }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp index e968320ac6..8096d309f9 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp @@ -22,45 +22,22 @@ #include "qpid/linearstore/jrnl/enq_rec.h" #include <cassert> -#include <cerrno> -#include <cstdlib> #include <cstring> #include <iomanip> -#include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" -#include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { -// Constructor used for read operations, where buf contains preallocated space to receive data. enq_rec::enq_rec(): jrec(), // superclass - //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false), _xidp(0), _data(0), _buff(0) - //_enq_tail(_enq_hdr) -{ - ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false); - ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0); -} - -// Constructor used for transactional write operations, where dbuf contains data to be written. -enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen, - const void* const xidp, const std::size_t xidlen, const bool transient): - jrec(), // superclass - //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient), - _xidp(xidp), - _data(dbuf), - _buff(0) - //_enq_tail(_enq_hdr) { - ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen); + ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false); ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0); - ::set_enq_transient(&_enq_hdr, transient); } enq_rec::~enq_rec() @@ -68,28 +45,11 @@ enq_rec::~enq_rec() clean(); } -// Prepare instance for use in reading data from journal, where buf contains preallocated space -// to receive data. -void -enq_rec::reset() -{ - _enq_hdr._rhdr._rid = 0; - ::set_enq_transient(&_enq_hdr, false); - _enq_hdr._xidsize = 0; - _enq_hdr._dsize = 0; - _xidp = 0; - _data = 0; - _buff = 0; - _enq_tail._rid = 0; -} - -// Prepare instance for use in writing transactional data to journal, where dbuf contains data to -// be written. void -enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen, - const void* const xidp, const std::size_t xidlen, const bool transient, - const bool external) +enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, + const void* const xidp, const std::size_t xidlen, const bool transient, const bool external) { + _enq_hdr._rhdr._serial = serial; _enq_hdr._rhdr._rid = rid; ::set_enq_transient(&_enq_hdr, transient); ::set_enq_external(&_enq_hdr, external); @@ -98,6 +58,7 @@ enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dle _xidp = xidp; _data = dbuf; _buff = 0; + _enq_tail._serial = serial; _enq_tail._rid = rid; } @@ -246,217 +207,19 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) return size_dblks(wr_cnt); } -uint32_t -enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) -{ - assert(rptr != 0); - assert(max_size_dblks > 0); - - std::size_t rd_cnt = 0; - if (rec_offs_dblks) // Continuation of record on new page - { - const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize + - (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize); - const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t); - const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size); - const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size); - const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; - const std::size_t offs = rec_offs - sizeof(enq_hdr_t); - - if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of record fits within this page - if (offs < _enq_hdr._xidsize) - { - // some XID still outstanding, copy remainder of XID, data and tail - const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs; - std::memcpy((char*)_buff + offs, rptr, rem); - rd_cnt += rem; - std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail)); - chk_tail(); - rd_cnt += sizeof(_enq_tail); - } - else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) - { - // some data still outstanding, copy remainder of data and tail - const std::size_t data_offs = offs - _enq_hdr._xidsize; - const std::size_t data_rem = _enq_hdr._dsize - data_offs; - std::memcpy((char*)_buff + offs, rptr, data_rem); - rd_cnt += data_rem; - std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail)); - chk_tail(); - rd_cnt += sizeof(_enq_tail); - } - else - { - // Tail or part of tail only outstanding, complete tail - const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize - - _enq_hdr._dsize; - const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs; - std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem); - chk_tail(); - rd_cnt = tail_rem; - } - } - else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of xid & data fits within this page; tail split - - /* - * TODO: This section needs revision. Since it is known that the end of the page falls within the - * tail record, it is only necessary to write from the current offset to the end of the page under - * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy() - * operation. - * - * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if - * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in - * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and - * end of a page both fall within the same tail record, in which case the tail would have to be - * (much!) larger. However, the logic here does not account for this possibility. - * - * If the optimization above is undertaken, this code would probably be removed. - */ - if (offs < _enq_hdr._xidsize) // 1 - { - // some XID still outstanding, copy remainder of XID and data - const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs; - std::memcpy((char*)_buff + offs, rptr, rem); - rd_cnt += rem; - } - else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2 - { - // some data still outstanding, copy remainder of data - const std::size_t data_offs = offs - _enq_hdr._xidsize; - const std::size_t data_rem = _enq_hdr._dsize - data_offs; - std::memcpy((char*)_buff + offs, rptr, data_rem); - rd_cnt += data_rem; - } - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem); - rd_cnt += tail_rem; - } - } - else - { - // Since xid and data are contiguous, both fit within current page - copy whole page - const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES); - std::memcpy((char*)_buff + offs, rptr, data_cp_size); - rd_cnt += data_cp_size; - } - } - else // Start of record - { - // Get and check header - //_enq_hdr.hdr_copy(h); - ::rec_hdr_copy(&_enq_hdr._rhdr, &h); - rd_cnt = sizeof(rec_hdr_t); - _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt); - rd_cnt += sizeof(std::size_t); -#if defined(JRNL_32_BIT) - rd_cnt += sizeof(uint32_t); // Filler 0 -#endif - _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt); - rd_cnt = sizeof(enq_hdr_t); - chk_hdr(); - if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize)) - { - _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize)); - MALLOC_CHK(_buff, "_buff", "enq_rec", "decode"); - - const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize; - const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize); - const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t); - const uint32_t hdr_xid_dblks = size_dblks(hdr_xid_size); - const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size); - const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size); - // Check if record (header + data + tail) fits within this page, we can check the - // tail before the expense of copying data to memory - if (hdr_tail_dblks <= max_size_dblks) - { - // Header, xid, data and tail fits within this page - if (_enq_hdr._xidsize) - { - std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); - rd_cnt += _enq_hdr._xidsize; - } - if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) - { - std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, - _enq_hdr._dsize); - rd_cnt += _enq_hdr._dsize; - } - std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail)); - chk_tail(); - rd_cnt += sizeof(_enq_tail); - } - else if (hdr_data_dblks <= max_size_dblks) - { - // Header, xid and data fit within this page, tail split or separated - if (_enq_hdr._xidsize) - { - std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); - rd_cnt += _enq_hdr._xidsize; - } - if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) - { - std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, - _enq_hdr._dsize); - rd_cnt += _enq_hdr._dsize; - } - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem); - rd_cnt += tail_rem; - } - } - else if (hdr_xid_dblks <= max_size_dblks) - { - // Header and xid fits within this page, data split or separated - if (_enq_hdr._xidsize) - { - std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); - rd_cnt += _enq_hdr._xidsize; - } - if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) - { - const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size); - rd_cnt += data_cp_size; - } - } - else - { - // Header fits within this page, xid split or separated - const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size); - rd_cnt += data_cp_size; - } - } - } - return size_dblks(rd_cnt); -} - bool -enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) +enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { + uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate (if req'd) for xid //_enq_hdr.hdr_copy(h); ::rec_hdr_copy(&_enq_hdr._rhdr, &h); - ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t)); -#if defined(JRNL_32_BIT) - ifsp->ignore(sizeof(uint32_t)); // _filler0 -#endif - ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t)); -#if defined(JRNL_32_BIT) - ifsp->ignore(sizeof(uint32_t)); // _filler1 -#endif + ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize)); + ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize)); rec_offs = sizeof(_enq_hdr); - if (_enq_hdr._xidsize) + if (_enq_hdr._xidsize > 0) { _buff = std::malloc(_enq_hdr._xidsize); MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode"); @@ -517,8 +280,19 @@ enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) } } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); - chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); + int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum); + if (res != 0) { + std::stringstream oss; + switch (res) { + case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break; + case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break; + case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break; + case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break; + default: oss << "Unknown error " << res; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info + } return true; } @@ -578,44 +352,9 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool } void -enq_rec::set_rid(const uint64_t rid) -{ - _enq_hdr._rhdr._rid = rid; - _enq_tail._rid = rid; -} - -void -enq_rec::chk_hdr() const -{ - jrec::chk_hdr(_enq_hdr._rhdr); - if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid; - oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC; - oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr"); - } -} - -void -enq_rec::chk_hdr(uint64_t rid) const -{ - chk_hdr(); - jrec::chk_rid(_enq_hdr._rhdr, rid); -} - -void -enq_rec::chk_tail() const -{ - jrec::chk_tail(_enq_tail, _enq_hdr._rhdr); -} - -void enq_rec::clean() { // clean up allocated memory here } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h index b3502d9ad6..24ed9e99e1 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h @@ -19,86 +19,54 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H -#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H +#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H +#define QPID_LINEARSTORE_JOURNAL_ENQ_REC_H -namespace qpid -{ -namespace qls_jrnl -{ -class enq_rec; -}} - -#include <cstddef> -#include "qpid/linearstore/jrnl/utils/enq_hdr.h" #include "qpid/linearstore/jrnl/jrec.h" +#include "qpid/linearstore/jrnl/utils/enq_hdr.h" +#include "qpid/linearstore/jrnl/utils/rec_tail.h" -namespace qpid -{ -namespace qls_jrnl -{ - - /** - * \class enq_rec - * \brief Class to handle a single journal enqueue record. - */ - class enq_rec : public jrec - { - private: - enq_hdr_t _enq_hdr; - const void* _xidp; ///< xid pointer for encoding (for writing to disk) - const void* _data; ///< Pointer to data to be written to disk - void* _buff; ///< Pointer to buffer to receive data read from disk - rec_tail_t _enq_tail; - - public: - /** - * \brief Constructor used for read operations. - */ - enq_rec(); - - /** - * \brief Constructor used for write operations, where mbuf contains data to be written. - */ - enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen, - const void* const xidp, const std::size_t xidlen, const bool transient); +namespace qpid { +namespace linearstore { +namespace journal { - /** - * \brief Destructor - */ - virtual ~enq_rec(); +/** +* \class enq_rec +* \brief Class to handle a single journal enqueue record. +*/ +class enq_rec : public jrec +{ +private: + ::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct + const void* _xidp; ///< xid pointer for encoding (for writing to disk) + const void* _data; ///< Pointer to data to be written to disk + void* _buff; ///< Pointer to buffer to receive data read from disk + ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct - // Prepare instance for use in reading data from journal, xid and data will be allocated - void reset(); - // Prepare instance for use in writing data to journal - void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen, - const void* const xidp, const std::size_t xidlen, const bool transient, - const bool external); +public: + enq_rec(); + virtual ~enq_rec(); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); - uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); - // Decode used for recover - bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs); + void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, + const void* const xidp, const std::size_t xidlen, const bool transient, const bool external); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); - std::size_t get_xid(void** const xidpp); - std::size_t get_data(void** const datapp); - inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); } - inline bool is_external() const { return ::is_enq_external(&_enq_hdr); } - std::string& str(std::string& str) const; - inline std::size_t data_size() const { return _enq_hdr._dsize; } - inline std::size_t xid_size() const { return _enq_hdr._xidsize; } - std::size_t rec_size() const; - static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external); - inline uint64_t rid() const { return _enq_hdr._rhdr._rid; } - void set_rid(const uint64_t rid); + std::size_t get_xid(void** const xidpp); + std::size_t get_data(void** const datapp); + inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); } + inline bool is_external() const { return ::is_enq_external(&_enq_hdr); } + std::string& str(std::string& str) const; + inline std::size_t data_size() const { return _enq_hdr._dsize; } + inline std::size_t xid_size() const { return _enq_hdr._xidsize; } + std::size_t rec_size() const; + static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external); + inline uint64_t rid() const { return _enq_hdr._rhdr._rid; } - private: - void chk_hdr() const; - void chk_hdr(uint64_t rid) const; - void chk_tail() const; - virtual void clean(); - }; // class enq_rec +private: + virtual void clean(); +}; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h index 511a2a41ab..106f58cf5f 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h @@ -19,13 +19,12 @@ * */ -#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H -#define QPID_LINEARSTORE_JRNL_ENUMS_H +#ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H +#define QPID_LINEARSTORE_JOURNAL_ENUMS_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // TODO: Change this to flags, as multiple of these conditions may exist simultaneously /** @@ -37,12 +36,7 @@ namespace qls_jrnl RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO. RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO. RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read. -// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized) -// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached. -// RHM_IORES_FULL, ///< During write operations, the journal files are full. -// RHM_IORES_BUSY, ///< Another blocking operation is in progress. RHM_IORES_TXPENDING ///< Operation blocked by pending transaction. -// RHM_IORES_NOTIMPL ///< Function is not implemented. }; typedef _iores iores; @@ -54,46 +48,11 @@ namespace qls_jrnl case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT"; case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT"; case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY"; -// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID"; -// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH"; -// case RHM_IORES_FULL: return "RHM_IORES_FULL"; -// case RHM_IORES_BUSY: return "RHM_IORES_BUSY"; case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING"; -// case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL"; } return "<iores unknown>"; } -/* - enum _log_level - { - LOG_TRACE = 0, - LOG_DEBUG, - LOG_INFO, - LOG_NOTICE, - LOG_WARN, - LOG_ERROR, - LOG_CRITICAL - }; - typedef _log_level log_level_t; - - static inline const char* log_level_str(log_level_t ll) - { - switch (ll) - { - case LOG_TRACE: return "TRACE"; - case LOG_DEBUG: return "DEBUG"; - case LOG_INFO: return "INFO"; - case LOG_NOTICE: return "NOTICE"; - case LOG_WARN: return "WARN"; - case LOG_ERROR: return "ERROR"; - case LOG_CRITICAL: return "CRITICAL"; - } - return "<log level unknown>"; - } -*/ - - -}} +}}} -#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index 432887e577..83c61bcb5f 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H -#define QPID_LEGACYSTORE_JRNL_JCFG_H +#ifndef QPID_QLS_JRNL_JCFG_H +#define QPID_QLS_JRNL_JCFG_H #define QLS_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes, should match size used on disk media */ #define QLS_AIO_ALIGN_BOUNDARY_BYTES QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */ @@ -55,4 +55,4 @@ #define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */ #define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */ -#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */ +#endif /* ifndef QPID_QLS_JRNL_JCFG_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index f21deead5d..099c32b18d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -38,10 +38,9 @@ #include <sstream> #include <unistd.h> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { #define AIO_CMPL_TIMEOUT_SEC 5 #define AIO_CMPL_TIMEOUT_NSEC 0 @@ -132,9 +131,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp, _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr); highest_rid = _recoveryManager.getHighestRecordId(); - _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true)); + _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5)); _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); - _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController); + _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); @@ -430,4 +429,4 @@ std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::en return false; } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h index bb91eac569..76d2a9877d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H -#define QPID_LINEARSTORE_JRNL_JCNTL_H +#ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H +#define QPID_LINEARSTORE_JOURNAL_JCNTL_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class jcntl; -}} +}}} #include <cstddef> #include <deque> @@ -38,10 +37,9 @@ namespace qls_jrnl #include "qpid/linearstore/jrnl/smutex.h" #include "qpid/linearstore/jrnl/wmgr.h" -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class EmptyFilePool; class EmptyFilePoolManager; @@ -570,6 +568,6 @@ namespace qls_jrnl bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp); }; -}} +}}} -#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp index 977c75e37a..734493074f 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp @@ -32,10 +32,9 @@ #include <sys/stat.h> #include <unistd.h> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/): _dirname(dirname)/*, @@ -476,4 +475,4 @@ operator<<(std::ostream& os, const jdir* jdirPtr) return os; } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h index 01a08c57fc..c13a5f5af0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h @@ -19,25 +19,23 @@ * */ -#ifndef QPID_LINEARSTORE_JRNL_JDIR_H -#define QPID_LINEARSTORE_JRNL_JDIR_H +#ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H +#define QPID_LINEARSTORE_JOURNAL_JDIR_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class jdir; -}} +}}} //#include "qpid/linearstore/jrnl/jinf.h" #include <dirent.h> #include <string> #include <vector> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { /** * \class jdir @@ -364,6 +362,6 @@ namespace qls_jrnl static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name); }; -}} +}}} -#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp index c8fd2f55fe..a68966030b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp @@ -21,10 +21,9 @@ #include "qpid/linearstore/jrnl/jerrno.h" -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { std::map<uint32_t, const char*> jerrno::_err_map; std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr; @@ -214,4 +213,4 @@ jerrno::err_msg(const uint32_t err_no) throw () return _err_map_itr->second; } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h index 4c7124ae22..5af0a7ada0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h @@ -19,24 +19,22 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H -#define QPID_LEGACYSTORE_JRNL_JERRNO_H +#ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H +#define QPID_LINEARSTORE_JOURNAL_JERRNO_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class jerrno; -}} +}}} #include <map> #include <stdint.h> #include <string> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { /** * \class jerrno @@ -144,6 +142,6 @@ namespace qls_jrnl static bool __init(); }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp index 38ef85ddb8..44e9142698 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp @@ -27,10 +27,9 @@ #define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1 -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { jexception::jexception() throw (): std::exception(), @@ -168,4 +167,4 @@ operator<<(std::ostream& os, const jexception* jePtr) return os; } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h index a3a34691df..712b357254 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H -#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H +#ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H +#define QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class jexception; -}} +}}} #include <cerrno> #include <cstdio> @@ -70,10 +69,10 @@ class jexception; ::abort(); \ } -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { + /** * \class jexception * \brief Generic journal exception class @@ -121,6 +120,6 @@ namespace qls_jrnl friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr); }; // class jexception -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp index 87e0b0c04c..45f0197f8b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp @@ -24,84 +24,15 @@ #include <iomanip> #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/utils/rec_hdr.h" +#include "qpid/linearstore/jrnl/utils/rec_tail.h" #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { jrec::jrec() {} jrec::~jrec() {} -void -jrec::chk_hdr(const rec_hdr_t& hdr) -{ - if (hdr._magic == 0) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "enq magic NULL: rid=0x" << hdr._rid; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr"); - } - if (hdr._version != QLS_JRNL_VERSION) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "version: rid=0x" << hdr._rid; - oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION; - oss << " read=0x" << std::setw(2) << (int)hdr._version; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr"); - } -//#if defined (JRNL_LITTLE_ENDIAN) -// uint8_t endian_flag = RHM_LENDIAN_FLAG; -//#else -// uint8_t endian_flag = RHM_BENDIAN_FLAG; -//#endif -// if (hdr._eflag != endian_flag) -// { -// std::ostringstream oss; -// oss << std::hex << std::setfill('0'); -// oss << "endian_flag: rid=" << hdr._rid; -// oss << ": expected=0x" << std::setw(2) << (int)endian_flag; -// oss << " read=0x" << std::setw(2) << (int)hdr._eflag; -// throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr"); -// } -} - -void -jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid) -{ - if (hdr._rid != rid) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "rid mismatch: expected=0x" << rid; - oss << " read=0x" << hdr._rid; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr"); - } -} - -void -jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr) -{ - if (tail._xmagic != ~hdr._magic) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "magic: rid=0x" << hdr._rid; - oss << ": expected=0x" << ~hdr._magic; - oss << " read=0x" << tail._xmagic; - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail"); - } - if (tail._rid != hdr._rid) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "rid: rid=0x" << hdr._rid; - oss << ": read=0x" << tail._rid; - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail"); - } -} - -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h index 619cd0bdec..ef481cc3ed 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h @@ -19,152 +19,105 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JREC_H -#define QPID_LEGACYSTORE_JRNL_JREC_H - -namespace qpid -{ -namespace qls_jrnl -{ -class jrec; -}} +#ifndef QPID_LINEARSTORE_JOURNAL_JREC_H +#define QPID_LINEARSTORE_JOURNAL_JREC_H #include <cstddef> #include <fstream> #include "qpid/linearstore/jrnl/jcfg.h" -#include "qpid/linearstore/jrnl/utils/rec_hdr.h" -#include "qpid/linearstore/jrnl/utils/rec_tail.h" +#include <stdint.h> #include <string> -namespace qpid -{ -namespace qls_jrnl +struct rec_hdr_t; +struct rec_tail_t; + +namespace qpid { +namespace linearstore { +namespace journal { + +/** +* \class jrec +* \brief Abstract class for all file jrecords, both data and log. This class establishes +* the common data format and structure for these jrecords. +*/ +class jrec { +public: + jrec(); + virtual ~jrec(); /** - * \class jrec - * \brief Abstract class for all file jrecords, both data and log. This class establishes - * the common data format and structure for these jrecords. + * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned + * pointer wptr starting at position rec_offs_dblks in the encoded record to a + * maximum size of max_size_dblks. + * + * This call encodes the content of the data contianed in this instance of jrec into a + * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter + * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer. + * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded + * data block this instance represents at which to start encoding. + * + * Encoding entails writing the record header (struct enq_hdr), the data and the record tail + * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE), + * thus any remaining space in the final data-block is ignored; the returned value is the + * number of data-blocks consumed from the page by the encode action. Provided the initial + * alignment requirements are met, records may be of arbitrary size and may span multiple + * data-blocks, disk-blocks and/or pages. + * + * Since the record size in data-blocks is known, the general usage pattern is to call + * encode() as many times as is needed to fully encode the data. Each call to encode() + * will encode as much of the record as it can to what remains of the current page cache, + * and will return the number of data-blocks actually encoded. + * + * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this + * is an instance representing record r2. Being larger than the page size ps, r2 would span + * multiple pages as follows: + * <pre> + * |<---ps--->| + * +----------+----------+----------+----... + * | |r2a| r2b | r2c | | + * |<-r1-><----------r2----------> | + * +----------+----------+----------+----... + * page: p0 p1 p2 + * </pre> + * Encoding record r2 will require multiple calls to encode; one for each page which + * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the + * points where the page boundaries intersect with the record. Assuming a page size + * of ps, the page boundary pointers are represented by their names p0, p1... and the + * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls + * should be as follows: + * <pre> + * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks) + * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps) + * encode(p2, r2a+r2b, ps); (returns r2c data-blocks) + * </pre> + * + * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to + * take place. + * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding. + * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr. + * \returns Number of data-blocks encoded. */ - class jrec - { - public: - jrec(); - virtual ~jrec(); - - /** - * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned - * pointer wptr starting at position rec_offs_dblks in the encoded record to a - * maximum size of max_size_dblks. - * - * This call encodes the content of the data contianed in this instance of jrec into a - * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter - * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer. - * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded - * data block this instance represents at which to start encoding. - * - * Encoding entails writing the record header (struct enq_hdr), the data and the record tail - * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE), - * thus any remaining space in the final data-block is ignored; the returned value is the - * number of data-blocks consumed from the page by the encode action. Provided the initial - * alignment requirements are met, records may be of arbitrary size and may span multiple - * data-blocks, disk-blocks and/or pages. - * - * Since the record size in data-blocks is known, the general usage pattern is to call - * encode() as many times as is needed to fully encode the data. Each call to encode() - * will encode as much of the record as it can to what remains of the current page cache, - * and will return the number of data-blocks actually encoded. - * - * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this - * is an instance representing record r2. Being larger than the page size ps, r2 would span - * multiple pages as follows: - * <pre> - * |<---ps--->| - * +----------+----------+----------+----... - * | |r2a| r2b | r2c | | - * |<-r1-><----------r2----------> | - * +----------+----------+----------+----... - * page: p0 p1 p2 - * </pre> - * Encoding record r2 will require multiple calls to encode; one for each page which - * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the - * points where the page boundaries intersect with the record. Assuming a page size - * of ps, the page boundary pointers are represented by their names p0, p1... and the - * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls - * should be as follows: - * <pre> - * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks) - * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps) - * encode(p2, r2a+r2b, ps); (returns r2c data-blocks) - * </pre> - * - * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to - * take place. - * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding. - * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr. - * \returns Number of data-blocks encoded. - */ - virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, - uint32_t max_size_dblks) = 0; - - /** - * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned - * pointer rptr starting at position jrec_offs_dblks in the encoded record to a - * maximum size of max_size_blks. - * - * This call decodes a record in the page buffer pointed to by the data-block-aligned - * (defined by JRNL_DBLK_SIZE) parameter rptr into this instance of jrec. No more than - * paramter max_size_dblks data-blocks may be read from the buffer. The parameter - * jrec_offs_dblks is the offset in data-blocks within the encoded record at which to start - * decoding. - * - * Decoding entails reading the record header, the data and the tail. The record is - * data-block-aligned (defined by JRNL_DBLK_SIZE); the returned value is the number of - * data-blocks read from the buffer by the decode action. As the record data size is only - * known once the header is read, the number of calls required to complete reading the - * record will depend on the vlaues within this instance which are set when the - * header is decoded. - * - * A non-zero value for jrec_offs_dblks implies that this is not the first call to - * decode and the record data will be appended at this offset. - * - * \param h Reference to instance of struct hdr, already read from page buffer and used - * to determine record type - * \param rptr Data-block-aligned pointer to position in page buffer where decoding is to - * begin. - * \param rec_offs_dblks Offset within record from which to start appending the decoded - * record. - * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr. - * \returns Number of data-blocks read (consumed). - */ - virtual uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, - uint32_t max_size_dblks) = 0; - - virtual bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; + virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0; + virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; - virtual std::string& str(std::string& str) const = 0; - virtual std::size_t data_size() const = 0; - virtual std::size_t xid_size() const = 0; - virtual std::size_t rec_size() const = 0; - inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); } - static inline uint32_t size_dblks(const std::size_t size) - { return size_blks(size, QLS_DBLK_SIZE_BYTES); } - static inline uint32_t size_sblks(const std::size_t size) - { return size_blks(size, QLS_SBLK_SIZE_BYTES); } - static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize) - { return (size + blksize - 1)/blksize; } - virtual uint64_t rid() const = 0; + virtual std::string& str(std::string& str) const = 0; + virtual std::size_t data_size() const = 0; + virtual std::size_t xid_size() const = 0; + virtual std::size_t rec_size() const = 0; + inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); } + static inline uint32_t size_dblks(const std::size_t size) + { return size_blks(size, QLS_DBLK_SIZE_BYTES); } + static inline uint32_t size_sblks(const std::size_t size) + { return size_blks(size, QLS_SBLK_SIZE_BYTES); } + static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize) + { return (size + blksize - 1)/blksize; } + virtual uint64_t rid() const = 0; - protected: - virtual void chk_hdr() const = 0; - virtual void chk_hdr(uint64_t rid) const = 0; - virtual void chk_tail() const = 0; - static void chk_hdr(const rec_hdr_t& hdr); - static void chk_rid(const rec_hdr_t& hdr, uint64_t rid); - static void chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr); - virtual void clean() = 0; - }; // class jrec +protected: + virtual void clean() = 0; +}; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_JREC_H +#endif // ifndef QPID_LINEARSTORE_JRNL_JREC_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp index 32b024a09d..b561c6ca2b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp @@ -29,10 +29,9 @@ #include "qpid/linearstore/jrnl/jerrno.h" #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { pmgr::page_cb::page_cb(uint16_t index): _index(index), @@ -199,4 +198,4 @@ pmgr::page_state_str(page_state ps) return "<page_state unknown>"; } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h index 64143a9e6e..787828af01 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h @@ -19,16 +19,15 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H -#define QPID_LEGACYSTORE_JRNL_PMGR_H +#ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H +#define QPID_LINEARSTORE_JOURNAL_PMGR_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class pmgr; class jcntl; -}} +}}} #include <deque> #include "qpid/linearstore/jrnl/aio.h" @@ -40,11 +39,11 @@ namespace qls_jrnl #include "qpid/linearstore/jrnl/txn_map.h" #include "qpid/linearstore/jrnl/txn_rec.h" -namespace qpid -{ -namespace qls_jrnl -{ -class JournalFile; +namespace qpid { +namespace linearstore { +namespace journal { + + class JournalFile; /** * \brief Abstract class for managing either read or write page cache of arbitrary size and @@ -122,6 +121,6 @@ class JournalFile; virtual void clean(); }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/slock.h b/qpid/cpp/src/qpid/linearstore/jrnl/slock.h index bbfee37353..b118f71bde 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/slock.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/slock.h @@ -19,17 +19,16 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H -#define QPID_LEGACYSTORE_JRNL_SLOCK_H +#ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H +#define QPID_LINEARSTORE_JOURNAL_SLOCK_H #include "qpid/linearstore/jrnl/jexception.h" #include "qpid/linearstore/jrnl/smutex.h" #include <pthread.h> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope class slock @@ -68,6 +67,6 @@ namespace qls_jrnl inline bool locked() const { return _locked; } }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h b/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h index 087e19730b..91520ce1ae 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h @@ -19,16 +19,15 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H -#define QPID_LEGACYSTORE_JRNL_SMUTEX_H +#ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H +#define QPID_LINEARSTORE_JOURNAL_SMUTEX_H #include "qpid/linearstore/jrnl/jexception.h" #include <pthread.h> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks class smutex @@ -47,6 +46,6 @@ namespace qls_jrnl inline pthread_mutex_t* get() const { return &_m; } }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp index 49e9a61704..5bb04496a4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp @@ -23,10 +23,9 @@ #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { const std::string time_ns::str(int precision) const @@ -39,5 +38,4 @@ time_ns::str(int precision) const return oss.str(); } - -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h index 3319573001..a228d47475 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h @@ -19,17 +19,16 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H -#define QPID_LEGACYSTORE_JRNL_TIME_NS_H +#ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H +#define QPID_LINEARSTORE_JOURNAL_TIME_NS_H #include <cerrno> #include <ctime> #include <string> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { struct time_ns : public timespec { @@ -88,6 +87,6 @@ struct time_ns : public timespec { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; } }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp index 9c32f4128b..2cc7966efb 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp @@ -27,10 +27,9 @@ #include "qpid/linearstore/jrnl/slock.h" #include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { // return/error codes int16_t txn_map::TMAP_RID_NOT_FOUND = -2; @@ -231,4 +230,4 @@ txn_map::xid_list(std::vector<std::string>& xv) } } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h index 146997abe0..419e043529 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H -#define QPID_LEGACYSTORE_JRNL_TXN_MAP_H +#ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H +#define QPID_LINEARSTORE_JOURNAL_TXN_MAP_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class txn_map; -}} +}}} #include "qpid/linearstore/jrnl/smutex.h" #include <map> @@ -35,10 +34,9 @@ namespace qls_jrnl #include <string> #include <vector> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { /** * \struct txn_data_struct @@ -141,6 +139,6 @@ namespace qls_jrnl const txn_data_list get_tdata_list_nolock(const std::string& xid); }; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp index 3c78903520..20333be622 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp @@ -22,38 +22,20 @@ #include "qpid/linearstore/jrnl/txn_rec.h" #include <cassert> -#include <cerrno> -#include <cstdlib> #include <cstring> #include <iomanip> -#include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" -#include <sstream> -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { txn_rec::txn_rec(): -// _txn_hdr(), _xidp(0), _buff(0) -// _txn_tail() { - ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0); - ::rec_tail_init(&_txn_tail, 0, 0, 0); -} - -txn_rec::txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp, - const std::size_t xidlen/*, const bool owi*/): -// _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi), - _xidp(xidp), - _buff(0) -// _txn_tail(_txn_hdr) -{ - ::txn_hdr_init(&_txn_hdr, magic, QLS_JRNL_VERSION, 0, rid, xidlen); - ::rec_tail_copy(&_txn_tail, &_txn_hdr._rhdr, 0); + ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0); + ::rec_tail_init(&_txn_tail, 0, 0, 0, 0); } txn_rec::~txn_rec() @@ -62,28 +44,17 @@ txn_rec::~txn_rec() } void -txn_rec::reset(const uint32_t magic) +txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, + const std::size_t xidlen) { - _txn_hdr._rhdr._magic = magic; - _txn_hdr._rhdr._rid = 0; - _txn_hdr._xidsize = 0; - _xidp = 0; - _buff = 0; - _txn_tail._xmagic = ~magic; - _txn_tail._rid = 0; -} - -void -txn_rec::reset(const uint32_t magic, const uint64_t rid, const void* const xidp, - const std::size_t xidlen/*, const bool owi*/) -{ - _txn_hdr._rhdr._magic = magic; + _txn_hdr._rhdr._magic = commitFlag ? QLS_TXC_MAGIC : QLS_TXA_MAGIC; + _txn_hdr._rhdr._serial = serial; _txn_hdr._rhdr._rid = rid; -// _txn_hdr.set_owi(owi); _txn_hdr._xidsize = xidlen; _xidp = xidp; _buff = 0; - _txn_tail._xmagic = ~magic; + _txn_tail._xmagic = ~_txn_hdr._rhdr._magic; + _txn_tail._serial = serial; _txn_tail._rid = rid; } @@ -195,132 +166,16 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) return size_dblks(wr_cnt); } -uint32_t -txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) -{ - assert(rptr != 0); - assert(max_size_dblks > 0); - - std::size_t rd_cnt = 0; - if (rec_offs_dblks) // Continuation of record on new page - { - const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize); - const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t)); - const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; - - if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of xid fits within this page - if (rec_offs - sizeof(txn_hdr_t) < _txn_hdr._xidsize) - { - // Part of xid still outstanding, copy remainder of xid and tail - const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t); - const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs; - std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); - rd_cnt = xid_rem; - std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail)); - chk_tail(); - rd_cnt += sizeof(_txn_tail); - } - else - { - // Tail or part of tail only outstanding, complete tail - const std::size_t tail_offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize; - const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs; - std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem); - chk_tail(); - rd_cnt = tail_rem; - } - } - else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks) - { - // Remainder of xid fits within this page, tail split - const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t); - const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs; - std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); - rd_cnt += xid_rem; - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem); - rd_cnt += tail_rem; - } - } - else - { - // Remainder of xid split - const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES); - std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size); - rd_cnt += xid_cp_size; - } - } - else // Start of record - { - // Get and check header - //_txn_hdr.hdr_copy(h); - ::rec_hdr_copy(&_txn_hdr._rhdr, &h); - rd_cnt = sizeof(rec_hdr_t); -#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) - rd_cnt += sizeof(uint32_t); // Filler 0 -#endif - _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt); - rd_cnt = sizeof(txn_hdr_t); - chk_hdr(); - _buff = std::malloc(_txn_hdr._xidsize); - MALLOC_CHK(_buff, "_buff", "txn_rec", "decode"); - const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize); - const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + - sizeof(rec_tail_t)); - - // Check if record (header + xid + tail) fits within this page, we can check the - // tail before the expense of copying data to memory - if (hdr_xid_tail_dblks <= max_size_dblks) - { - // Entire header, xid and tail fits within this page - std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize); - rd_cnt += _txn_hdr._xidsize; - std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail)); - rd_cnt += sizeof(_txn_tail); - chk_tail(); - } - else if (hdr_xid_dblks <= max_size_dblks) - { - // Entire header and xid fit within this page, tail split - std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize); - rd_cnt += _txn_hdr._xidsize; - const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - if (tail_rem) - { - std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem); - rd_cnt += tail_rem; - } - } - else - { - // Header fits within this page, xid split - const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; - std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size); - rd_cnt += xid_cp_size; - } - } - return size_dblks(rd_cnt); -} - bool -txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) +txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { + uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate for xid //_txn_hdr.hdr_copy(h); ::rec_hdr_copy(&_txn_hdr._rhdr, &h); -#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) - ifsp->ignore(sizeof(uint32_t)); // _filler0 -#endif - ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t)); -#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT) - ifsp->ignore(sizeof(uint32_t)); // _filler0 -#endif + ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize)); rec_offs = sizeof(txn_hdr_t); _buff = std::malloc(_txn_hdr._xidsize); MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode"); @@ -358,8 +213,22 @@ txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) } } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); - chk_tail(); // Throws if tail invalid or record incomplete + if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum + throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail + } assert(!ifsp->fail() && !ifsp->bad()); + int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum); + if (res != 0) { + std::stringstream oss; + switch (res) { + case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break; + case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break; + case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break; + case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break; + default: oss << "Unknown error " << res; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info + } return true; } @@ -403,38 +272,9 @@ txn_rec::rec_size() const } void -txn_rec::chk_hdr() const -{ - jrec::chk_hdr(_txn_hdr._rhdr); - if (_txn_hdr._rhdr._magic != QLS_TXA_MAGIC && _txn_hdr._rhdr._magic != QLS_TXC_MAGIC) - { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rhdr._rid; - oss << ": expected=(0x" << std::setw(8) << QLS_TXA_MAGIC; - oss << " or 0x" << QLS_TXC_MAGIC; - oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._rhdr._magic; - throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr"); - } -} - -void -txn_rec::chk_hdr(uint64_t rid) const -{ - chk_hdr(); - jrec::chk_rid(_txn_hdr._rhdr, rid); -} - -void -txn_rec::chk_tail() const -{ - jrec::chk_tail(_txn_tail, _txn_hdr._rhdr); -} - -void txn_rec::clean() { // clean up allocated memory here } -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h index 4e9700e392..f256891664 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h @@ -19,70 +19,49 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H -#define QPID_LEGACYSTORE_JRNL_TXN_REC_H +#ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H +#define QPID_LINEARSTORE_JOURNAL_TXN_REC_H -namespace qpid -{ -namespace qls_jrnl -{ -class txn_rec; -}} - -#include <cstddef> #include "qpid/linearstore/jrnl/jrec.h" #include "qpid/linearstore/jrnl/utils/txn_hdr.h" +#include "qpid/linearstore/jrnl/utils/rec_tail.h" -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { - /** - * \class txn_rec - * \brief Class to handle a single journal DTX commit or abort record. - */ - class txn_rec : public jrec - { - private: - txn_hdr_t _txn_hdr; ///< transaction header - const void* _xidp; ///< xid pointer for encoding (writing to disk) - void* _buff; ///< Pointer to buffer to receive data read from disk - rec_tail_t _txn_tail; ///< Record tail +/** +* \class txn_rec +* \brief Class to handle a single journal commit or abort record. +*/ +class txn_rec : public jrec +{ +private: + ::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct + const void* _xidp; ///< xid pointer for encoding (writing to disk) + void* _buff; ///< Pointer to buffer to receive data read from disk + ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct - public: - // constructor used for read operations and xid must have memory allocated - txn_rec(); - // constructor used for write operations, where xid already exists - txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp, - const std::size_t xidlen/*, const bool owi*/); - virtual ~txn_rec(); +public: + txn_rec(); + virtual ~txn_rec(); - // Prepare instance for use in reading data from journal - void reset(const uint32_t magic); - // Prepare instance for use in writing data to journal - void reset(const uint32_t magic, const uint64_t rid, const void* const xidp, - const std::size_t xidlen/*, const bool owi*/); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); - uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, - uint32_t max_size_dblks); - // Decode used for recover - bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs); + void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, + const std::size_t xidlen); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); - std::size_t get_xid(void** const xidpp); - std::string& str(std::string& str) const; - inline std::size_t data_size() const { return 0; } // This record never carries data - std::size_t xid_size() const; - std::size_t rec_size() const; - inline uint64_t rid() const { return _txn_hdr._rhdr._rid; } + std::size_t get_xid(void** const xidpp); + std::string& str(std::string& str) const; + inline std::size_t data_size() const { return 0; } // This record never carries data + std::size_t xid_size() const; + std::size_t rec_size() const; + inline uint64_t rid() const { return _txn_hdr._rhdr._rid; } - private: - void chk_hdr() const; - void chk_hdr(uint64_t rid) const; - void chk_tail() const; - virtual void clean(); - }; // class txn_rec +private: + virtual void clean(); +}; -}} +}}} -#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H +#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c index 0cdab3580c..144ce4125a 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c @@ -23,9 +23,9 @@ /*static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;*/ -void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t - rid, const uint64_t deq_rid, const uint64_t xidsize) { - rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid); +void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, + const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize) { + rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid); dest->_deq_rid = deq_rid; dest->_xidsize = xidsize; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h index 25cfc71efb..3392867153 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h @@ -1,5 +1,5 @@ -#ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H -#define QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -42,12 +42,14 @@ extern "C"{ * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a * previous enqueue record being dequeued by this record. * - * Record header info in binary format (32 bytes): + * Record header info in binary format (40 bytes): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ -+ * | magic | ver | flags | | - * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | + * | serial | | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | * | rid | | * +---+---+---+---+---+---+---+---+ -+ * | deq-rid | @@ -67,7 +69,7 @@ typedef struct deq_hdr_t { static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10; void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize); + const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize); void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src); bool is_txn_coml_commit(const deq_hdr_t *dh); void set_txn_coml_commit(deq_hdr_t *dh, const bool commit); @@ -78,4 +80,4 @@ void set_txn_coml_commit(deq_hdr_t *dh, const bool commit); } #endif -#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H */ +#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c index b8d2da7722..b4e8b62ff1 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c @@ -25,8 +25,8 @@ //static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20; void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) { - rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid); + const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) { + rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid); dest->_xidsize = xidsize; dest->_dsize = dsize; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h index 1beaef1db8..00108792bc 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h @@ -1,5 +1,5 @@ -#ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H -#define QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -38,12 +38,14 @@ extern "C"{ * * This header precedes all enqueue data in journal files. * - * Record header info in binary format (32 bytes): + * Record header info in binary format (40 bytes): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ -+ * | magic | ver | flags | | - * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | + * | serial | | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | * | rid | | * +---+---+---+---+---+---+---+---+ -+ * | xidsize | @@ -64,7 +66,7 @@ static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10; static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20; void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t xidsize, const uint64_t dsize); + const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize); void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src); bool is_enq_transient(const enq_hdr_t *eh); void set_enq_transient(enq_hdr_t *eh, const bool transient); @@ -78,4 +80,4 @@ bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t versio } #endif -#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H */ +#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c index dcc24bac8a..35b4ea219e 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c @@ -23,8 +23,8 @@ #include <string.h> void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, - const uint16_t efp_partition, const uint64_t file_size) { - rec_hdr_init(&dest->_rhdr, magic, version, 0, 0); + const uint16_t efp_partition, const uint64_t file_size) { + rec_hdr_init(&dest->_rhdr, magic, version, 0, 0, 0); dest->_fhdr_size_sblks = fhdr_size_sblks; dest->_efp_partition = efp_partition; dest->_reserved = 0; @@ -36,10 +36,11 @@ void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t vers dest->_queue_name_len = 0; } -int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro, - const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) { +int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid, + const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) { file_hdr_t* fhp = (file_hdr_t*)dest; fhp->_rhdr._uflag = uflag; + fhp->_rhdr._serial = serial; fhp->_rhdr._rid = rid; fhp->_fro = fro; fhp->_file_number = file_number; @@ -54,6 +55,13 @@ int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, con return set_time_now(dest); } +int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib) { + int res = rec_hdr_check_base(&hdr->_rhdr, magic, version); + if (res != 0) return 0; + if (hdr->_data_size_kib != data_size_kib) return 3; + return 0; +} + void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) { rec_hdr_copy(&dest->_rhdr, &src->_rhdr); dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied? @@ -84,6 +92,23 @@ int is_file_hdr_reset(file_hdr_t* target) { target->_queue_name_len == 0; } +/* +uint64_t random_64() { + int randomData = open("/dev/random", O_RDONLY); + if (randomData < 0) { + return 0ULL; + } + uint64_t randomNumber; + size_t size = sizeof(randomNumber); + ssize_t result = read(randomData, (char*)&randomNumber, size); + if (result != size) { + randomNumber = 0ULL; + } + close(randomData); + return randomNumber; +} +*/ + int set_time_now(file_hdr_t *fh) { struct timespec ts; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h index 6a53c631d5..53ca686fb8 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h @@ -1,5 +1,5 @@ -#ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H -#define QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -41,17 +41,19 @@ extern "C"{ * block in the file. The record ID and offset are updated on each overwrite of the * file. * - * File header info in binary format (66 bytes + size of file name in octets): + * File header info in binary format (74 bytes + size of file name in octets): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ -+ * | magic | ver | flags | | - * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t - * | first rid in file | | + * +---+---+---+---+---+---+---+---+ | + * | serial | | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | + * | rid | | * +---+---+---+---+---+---+---+---+ -+ * | fs | partn | reserved | * +---+---+---+---+---+---+---+---+ - * | file-size | + * | data-size | * +---+---+---+---+---+---+---+---+ * | fro | * +---+---+---+---+---+---+---+---+ @@ -87,10 +89,11 @@ typedef struct file_hdr_t { uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */ } file_hdr_t; -void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, - const uint16_t efp_partition, const uint64_t file_size); -int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro, - const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name); +void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, + const uint16_t fhdr_size_sblks, const uint16_t efp_partition, const uint64_t file_size); +int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid, + const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name); +int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib); void file_hdr_reset(file_hdr_t* target); int is_file_hdr_reset(file_hdr_t* target); void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src); @@ -103,4 +106,4 @@ void set_time(file_hdr_t *fh, struct timespec *ts); } #endif -#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H */ +#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c index be56068b9e..ad5262f9a3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c @@ -1,9 +1,10 @@ #include "rec_hdr.h" -void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid) { +void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid) { dest->_magic = magic; dest->_version = version; dest->_uflag = uflag; + dest->_serial = serial; dest->_rid = rid; } @@ -11,5 +12,19 @@ void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src) { dest->_magic = src->_magic; dest->_version = src->_version; dest->_uflag = src->_uflag; + dest->_serial = src->_serial; dest->_rid = src->_rid; } + +int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version) { + if (header->_magic != magic) return 1; + if (header->_version != version) return 2; + return 0; +} + +int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial) { + int res = rec_hdr_check_base(header, magic, version); + if (res != 0) return res; + if (header->_serial != serial) return 3; + return 0; +} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h index c843e21316..64349b5ab8 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h @@ -1,5 +1,5 @@ -#ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H -#define QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -34,11 +34,13 @@ extern "C"{ * This includes identification for the file type, the encoding version, endian * indicator and a record ID. * - * File header info in binary format (16 bytes): + * File header info in binary format (24 bytes): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ - * | magic | ver | flags | + * | magic | ver | uflag | + * +---+---+---+---+---+---+---+---+ + * | serial | * +---+---+---+---+---+---+---+---+ * | rid | * +---+---+---+---+---+---+---+---+ @@ -52,11 +54,14 @@ typedef struct rec_hdr_t { uint32_t _magic; /**< File type identifier (magic number) */ uint16_t _version; /**< File encoding version */ uint16_t _uflag; /**< User-defined flags */ + uint64_t _serial; /**< Serial number for this journal file */ uint64_t _rid; /**< Record ID (rotating 64-bit counter) */ } rec_hdr_t; -void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid); +void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid); void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src); +int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version); +int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial); #pragma pack() @@ -64,4 +69,4 @@ void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src); } #endif -#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H */ +#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c index 2e604eb6c6..88c68e2b78 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c @@ -21,14 +21,25 @@ #include "rec_tail.h" -void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid) { +void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial, + const uint64_t rid) { dest->_xmagic = xmagic; dest->_checksum = checksum; + dest->_serial = serial; dest->_rid = rid; } void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum) { dest->_xmagic = ~(src->_magic); dest->_checksum = checksum; + dest->_serial = src->_serial; dest->_rid = src->_rid; } + +int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) { + if (tail->_xmagic != ~header->_magic) return 1; + if (tail->_serial != header->_serial) return 2; + if (tail->_rid != header->_rid) return 3; + if (tail->_checksum != checksum) return 4; + return 0; +} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h index 9fbd186ea0..5163580ead 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h @@ -1,5 +1,5 @@ -#ifndef QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H -#define QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -41,26 +41,32 @@ extern "C"{ * The checksum is used to verify the xid and/or data portion of the record * on recovery, and excludes the header and tail. * - * Record header info in binary format (16 bytes): + * Record header info in binary format (24 bytes): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ * | ~(magic) | checksum | * +---+---+---+---+---+---+---+---+ + * | serial | + * +---+---+---+---+---+---+---+---+ * | rid | * +---+---+---+---+---+---+---+---+ * - * rid = Record ID + * ~(magic) = 1's compliment of magic of matching record header + * rid = Record ID of matching record header * </pre> */ typedef struct rec_tail_t { uint32_t _xmagic; /**< Binary inverse (1's complement) of hdr magic number */ - uint32_t _checksum; /**< Checksum of xid and data */ - uint64_t _rid; /**< ID (rotating 64-bit counter) */ + uint32_t _checksum; /**< Checksum of xid and data (excluding header itself) */ + uint64_t _serial; /**< Serial number for this journal file */ + uint64_t _rid; /**< Record ID (rotating 64-bit counter) */ } rec_tail_t; -void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid); +void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial, + const uint64_t rid); void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum); +int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum); #pragma pack() @@ -68,4 +74,4 @@ void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checks } #endif -#endif /* ifnedf QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H */ +#endif /* ifnedf QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c index 3b94bf3bb8..58d4cdebe4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c @@ -22,8 +22,8 @@ #include "txn_hdr.h" void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t xidsize) { - rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid); + const uint64_t serial, const uint64_t rid, const uint64_t xidsize) { + rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid); dest->_xidsize = xidsize; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h index 02c00e09b3..442a1d373d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h @@ -1,5 +1,5 @@ -#ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H -#define QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H +#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H +#define QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -40,12 +40,14 @@ extern "C"{ * Note that this record had its own rid distinct from the rids of the record(s) making up the * transaction it is committing or aborting. * - * Record header info in binary format (24 bytes): + * Record header info in binary format (32 bytes): * <pre> * 0 7 * +---+---+---+---+---+---+---+---+ -+ - * | magic | v | e | flags | | - * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t + * | magic | ver | flags | | + * +---+---+---+---+---+---+---+---+ | + * | serial | | struct rec_hdr_t + * +---+---+---+---+---+---+---+---+ | * | rid | | * +---+---+---+---+---+---+---+---+ -+ * | xidsize | @@ -58,7 +60,7 @@ typedef struct txn_hdr_t { } txn_hdr_t; void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t xidsize); + const uint64_t serial, const uint64_t rid, const uint64_t xidsize); void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src); #pragma pack() @@ -67,4 +69,4 @@ void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src); } #endif -#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H */ +#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index 991e26a3b9..9f91132f80 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -35,10 +35,9 @@ //#include <iostream> // DEBUG -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { wmgr::wmgr(jcntl* jc, enq_map& emap, @@ -150,7 +149,7 @@ wmgr::enqueue(const void* const data_buff, } uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); - _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external); + _enq_rec.reset(_lfc.getCurrentSerial(), rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external); if (!cont) { dtokp->set_rid(rid); @@ -265,7 +264,7 @@ wmgr::dequeue(data_tok* dtokp, const bool ext_rid = dtokp->external_rid(); uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId(); uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid(); - _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit); + _deq_rec.reset(_lfc.getCurrentSerial(), rid, dequeue_rid, xid_ptr, xid_len, txn_coml_commit); if (!cont) { if (!ext_rid) @@ -391,7 +390,7 @@ wmgr::abort(data_tok* dtokp, } uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); - _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); + _txn_rec.reset(false, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len); if (!cont) { dtokp->set_rid(rid); @@ -489,7 +488,7 @@ wmgr::commit(data_tok* dtokp, } uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); - _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); + _txn_rec.reset(true, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len); if (!cont) { dtokp->set_rid(rid); @@ -1033,4 +1032,4 @@ wmgr::status_str() const const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"}; -}} +}}} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h index 6b455febd9..0278317ae9 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h @@ -19,15 +19,14 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_WMGR_H -#define QPID_LEGACYSTORE_JRNL_WMGR_H +#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H +#define QPID_LINEARSTORE_JOURNAL_WMGR_H -namespace qpid -{ -namespace qls_jrnl -{ +namespace qpid { +namespace linearstore { +namespace journal { class wmgr; -}} +}}} #include <cstring> #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" @@ -37,123 +36,123 @@ class wmgr; class file_hdr_t; -namespace qpid -{ -namespace qls_jrnl +namespace qpid { +namespace linearstore { +namespace journal { + +class LinearFileController; + +/** +* \brief Class for managing a write page cache of arbitrary size and number of pages. +* +* The write page cache works on the principle of caching the write data within a page until +* that page is either full or flushed; this initiates a single AIO write operation to store +* the data on disk. +* +* The maximum disk throughput is achieved by keeping the write operations of uniform size. +* Waiting for a page cache to fill achieves this; and in high data volume/throughput situations +* achieves the optimal disk throughput. Calling flush() forces a write of the current page cache +* no matter how full it is, and disrupts the uniformity of the write operations. This should +* normally only be done if throughput drops and there is a danger of a page of unwritten data +* waiting around for excessive time. +* +* The usual tradeoff between data storage latency and throughput performance applies. +*/ +class wmgr : public pmgr { - class LinearFileController; - - /** - * \brief Class for managing a write page cache of arbitrary size and number of pages. - * - * The write page cache works on the principle of caching the write data within a page until - * that page is either full or flushed; this initiates a single AIO write operation to store - * the data on disk. - * - * The maximum disk throughput is achieved by keeping the write operations of uniform size. - * Waiting for a page cache to fill achieves this; and in high data volume/throughput situations - * achieves the optimal disk throughput. Calling flush() forces a write of the current page cache - * no matter how full it is, and disrupts the uniformity of the write operations. This should - * normally only be done if throughput drops and there is a danger of a page of unwritten data - * waiting around for excessive time. - * - * The usual tradeoff between data storage latency and throughput performance applies. - */ - class wmgr : public pmgr - { - private: - LinearFileController& _lfc; ///< Linear File Controller ref - uint32_t _max_dtokpp; ///< Max data writes per page - uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit - uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) - std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list - - // TODO: Convert _enq_busy etc into a proper threadsafe lock - // TODO: Convert to enum? Are these encodes mutually exclusive? - bool _enq_busy; ///< Flag true if enqueue is in progress - bool _deq_busy; ///< Flag true if dequeue is in progress - bool _abort_busy; ///< Flag true if abort is in progress - bool _commit_busy; ///< Flag true if commit is in progress - - enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT }; - static const char* _op_str[]; - - enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding - deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding - txn_rec _txn_rec; ///< Transaction record used for encoding/decoding - std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts - - public: - wmgr(jcntl* jc, - enq_map& emap, - txn_map& tmap, - LinearFileController& lfc); - wmgr(jcntl* jc, - enq_map& emap, - txn_map& tmap, - LinearFileController& lfc, - const uint32_t max_dtokpp, - const uint32_t max_iowait_us); - virtual ~wmgr(); - - void initialize(aio_callback* const cbp, - const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages, - const uint32_t max_dtokpp, - const uint32_t max_iowait_us, - std::size_t eo = 0); - iores enqueue(const void* const data_buff, - const std::size_t tot_data_len, - const std::size_t this_data_len, - data_tok* dtokp, - const void* const xid_ptr, - const std::size_t xid_len, - const bool transient, - const bool external); - iores dequeue(data_tok* dtokp, - const void* const xid_ptr, - const std::size_t xid_len, - const bool txn_coml_commit); - iores abort(data_tok* dtokp, - const void* const xid_ptr, - const std::size_t xid_len); - iores commit(data_tok* dtokp, - const void* const xid_ptr, - const std::size_t xid_len); - iores flush(); - int32_t get_events(timespec* const timeout, - bool flush); - bool is_txn_synced(const std::string& xid); - inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; } - inline uint32_t unflushed_dblks() { return _cached_offset_dblks; } - - // Debug aid - const std::string status_str() const; - - private: - void initialize(aio_callback* const cbp, - const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages); - iores pre_write_check(const _op_type op, - const data_tok* const dtokp, - const std::size_t xidsize = 0, - const std::size_t dsize = 0, - const bool external = false) const; - void dequeue_check(const std::string& xid, - const uint64_t drid); - void file_header_check(const uint64_t rid, - const bool cont, - const uint32_t rec_dblks_rem); - void flush_check(iores& res, - bool& cont, - bool& done, const uint64_t rid); - iores write_flush(); - void get_next_file(); - void dblk_roundup(); - void rotate_page(); - void clean(); - }; - -}} - -#endif // ifndef QPID_LEGACYSTORE_JRNL_WMGR_H +private: + LinearFileController& _lfc; ///< Linear File Controller ref + uint32_t _max_dtokpp; ///< Max data writes per page + uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit + uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) + std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list + + // TODO: Convert _enq_busy etc into a proper threadsafe lock + // TODO: Convert to enum? Are these encodes mutually exclusive? + bool _enq_busy; ///< Flag true if enqueue is in progress + bool _deq_busy; ///< Flag true if dequeue is in progress + bool _abort_busy; ///< Flag true if abort is in progress + bool _commit_busy; ///< Flag true if commit is in progress + + enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT }; + static const char* _op_str[]; + + enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding + deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding + txn_rec _txn_rec; ///< Transaction record used for encoding/decoding + std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts + +public: + wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc); + wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us); + virtual ~wmgr(); + + void initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us, + std::size_t eo = 0); + iores enqueue(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool transient, + const bool external); + iores dequeue(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool txn_coml_commit); + iores abort(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len); + iores commit(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len); + iores flush(); + int32_t get_events(timespec* const timeout, + bool flush); + bool is_txn_synced(const std::string& xid); + inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; } + inline uint32_t unflushed_dblks() { return _cached_offset_dblks; } + + // Debug aid + const std::string status_str() const; + +private: + void initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages); + iores pre_write_check(const _op_type op, + const data_tok* const dtokp, + const std::size_t xidsize = 0, + const std::size_t dsize = 0, + const bool external = false) const; + void dequeue_check(const std::string& xid, + const uint64_t drid); + void file_header_check(const uint64_t rid, + const bool cont, + const uint32_t rec_dblks_rem); + void flush_check(iores& res, + bool& cont, + bool& done, const uint64_t rid); + iores write_flush(); + void get_next_file(); + void dblk_roundup(); + void rotate_page(); + void clean(); +}; + +}}} + +#endif // ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H |
