summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-11-14 20:39:32 +0000
committerKim van der Riet <kpvdr@apache.org>2013-11-14 20:39:32 +0000
commit32b480e3a6a1173195bc1a322b3a14852ca03068 (patch)
tree4f888832787daed51b3e09a57187b101f269e60c /qpid/cpp/src
parent20b2cbd6fa546671ab6b3255a1f9f1e7385c60ea (diff)
downloadqpid-python-32b480e3a6a1173195bc1a322b3a14852ca03068.tar.gz
QPID-4984: Fix for recovery ambiguity issue, other code tidy-ups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1542066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/BindingDbt.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/BufferValue.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/Cursor.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/DataTokenImpl.h8
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES7
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdDbt.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdSequence.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp14
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h46
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalLogImpl.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp114
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/PreparedTransaction.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/StoreException.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp10
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h11
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp5
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h18
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp13
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h5
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp5
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h18
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h12
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp59
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h28
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp5
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h11
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp16
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp269
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h19
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/aio.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h31
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/cvar.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp57
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h46
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp233
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h98
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp313
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h114
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/enums.h55
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp13
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jdir.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jexception.h25
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp81
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jrec.h223
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h27
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/slock.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/smutex.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp10
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp220
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h91
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c4
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c33
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h25
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c17
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h17
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c13
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h22
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c4
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h16
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp17
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h251
81 files changed, 1217 insertions, 1929 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
index 67b8b52c6c..7863940534 100644
--- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
@@ -115,7 +115,7 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const
// For zero value, use default
p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
- } else if ( p > 128 || p & (p-1) ) {
+ } else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
if (p < 6) p = 4;
else if (p < 12) p = 8;
diff --git a/qpid/cpp/src/qpid/linearstore/BindingDbt.h b/qpid/cpp/src/qpid/linearstore/BindingDbt.h
index 7297ea7c7c..e5d61de248 100644
--- a/qpid/cpp/src/qpid/linearstore/BindingDbt.h
+++ b/qpid/cpp/src/qpid/linearstore/BindingDbt.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_BINDINGDBT_H
-#define QPID_LEGACYSTORE_BINDINGDBT_H
+#ifndef QPID_LINEARSTORE_BINDINGDBT_H
+#define QPID_LINEARSTORE_BINDINGDBT_H
#include "db-inc.h"
#include "qpid/broker/PersistableExchange.h"
@@ -53,4 +53,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_BINDINGDBT_H
+#endif // ifndef QPID_LINEARSTORE_BINDINGDBT_H
diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.h b/qpid/cpp/src/qpid/linearstore/BufferValue.h
index 5af1ac6a43..daeb81306a 100644
--- a/qpid/cpp/src/qpid/linearstore/BufferValue.h
+++ b/qpid/cpp/src/qpid/linearstore/BufferValue.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_BUFFERVALUE_H
-#define QPID_LEGACYSTORE_BUFFERVALUE_H
+#ifndef QPID_LINEARSTORE_BUFFERVALUE_H
+#define QPID_LINEARSTORE_BUFFERVALUE_H
#include "db-inc.h"
#include "qpid/broker/Persistable.h"
@@ -43,4 +43,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_BUFFERVALUE_H
+#endif // ifndef QPID_LINEARSTORE_BUFFERVALUE_H
diff --git a/qpid/cpp/src/qpid/linearstore/Cursor.h b/qpid/cpp/src/qpid/linearstore/Cursor.h
index c7e7f34faa..0287803b21 100644
--- a/qpid/cpp/src/qpid/linearstore/Cursor.h
+++ b/qpid/cpp/src/qpid/linearstore/Cursor.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_CURSOR_H
-#define QPID_LEGACYSTORE_CURSOR_H
+#ifndef QPID_LINEARSTORE_CURSOR_H
+#define QPID_LINEARSTORE_CURSOR_H
#include <boost/shared_ptr.hpp>
#include "db-inc.h"
@@ -47,4 +47,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_CURSOR_H
+#endif // ifndef QPID_LINEARSTORE_CURSOR_H
diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
index 17de01d237..154dacc5cb 100644
--- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
-#define QPID_LEGACYSTORE_DATATOKENIMPL_H
+#ifndef QPID_LINEARSTORE_DATATOKENIMPL_H
+#define QPID_LINEARSTORE_DATATOKENIMPL_H
#include "qpid/linearstore/jrnl/data_tok.h"
#include "qpid/broker/PersistableMessage.h"
@@ -29,7 +29,7 @@
namespace qpid{
namespace linearstore{
-class DataTokenImpl : public qpid::qls_jrnl::data_tok, public qpid::RefCounted
+class DataTokenImpl : public qpid::linearstore::journal::data_tok, public qpid::RefCounted
{
private:
boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
@@ -44,4 +44,4 @@ class DataTokenImpl : public qpid::qls_jrnl::data_tok, public qpid::RefCounted
} // namespace msgstore
} // namespace mrg
-#endif // ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
+#endif // ifndef QPID_LINEARSTORE_DATATOKENIMPL_H
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 0d15744694..7bbb9dc6a4 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -3,8 +3,8 @@ LinearStore issues:
Store:
------
-1. Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start,
- no way of discriminating old from new at boundary (used to use OWI).
+1. (SOLVED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record
+ start, no way of discriminating old from new at boundary (used to use OWI).
2. Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first.
@@ -12,6 +12,9 @@ Store:
4. Rework qpid management parameters and controls.
+5. Consistent logging: rework logging to provide uniform and consistent logging from store (both logging level and
+ places where logging occurs).
+
Tests
-----
diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.h b/qpid/cpp/src/qpid/linearstore/IdDbt.h
index f8bb0647db..c7264491ab 100644
--- a/qpid/cpp/src/qpid/linearstore/IdDbt.h
+++ b/qpid/cpp/src/qpid/linearstore/IdDbt.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_IDDBT_H
-#define QPID_LEGACYSTORE_IDDBT_H
+#ifndef QPID_LINEARSTORE_IDDBT_H
+#define QPID_LINEARSTORE_IDDBT_H
#include "db-inc.h"
@@ -39,4 +39,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_IDDBT_H
+#endif // ifndef QPID_LINEARSTORE_IDDBT_H
diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.h b/qpid/cpp/src/qpid/linearstore/IdSequence.h
index 11b31a2813..17996eec52 100644
--- a/qpid/cpp/src/qpid/linearstore/IdSequence.h
+++ b/qpid/cpp/src/qpid/linearstore/IdSequence.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_IDSEQUENCE_H
-#define QPID_LEGACYSTORE_IDSEQUENCE_H
+#ifndef QPID_LINEARSTORE_IDSEQUENCE_H
+#define QPID_LINEARSTORE_IDSEQUENCE_H
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
@@ -40,4 +40,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_IDSEQUENCE_H
+#endif // ifndef QPID_LINEARSTORE_IDSEQUENCE_H
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index 4f904665ee..354fca4f7e 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -36,7 +36,7 @@
#include "qmf/org/apache/qpid/linearstore/EventFull.h"
#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
-using namespace qpid::qls_jrnl;
+using namespace qpid::linearstore::journal;
using namespace qpid::linearstore;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::org::apache::qpid::linearstore;
@@ -128,10 +128,10 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
void
-JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_,
+JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::qls_jrnl::aio_callback* const cbp)
+ qpid::linearstore::journal::aio_callback* const cbp)
{
// efpp->createJournal(_jdir);
// QLS_LOG2(notice, _jid, "Initialized");
@@ -167,10 +167,10 @@ JournalImpl::recover(/*const uint16_t num_jfiles,
const bool auto_expand,
const uint16_t ae_max_jfiles,
const uint32_t jfsize_sblks,*/
- boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+ boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::qls_jrnl::aio_callback* const cbp,
+ qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id)
@@ -463,12 +463,12 @@ JournalImpl::handleIoResult(const iores r)
writeActivityFlag = true;
switch (r)
{
- case qpid::qls_jrnl::RHM_IORES_SUCCESS:
+ case qpid::linearstore::journal::RHM_IORES_SUCCESS:
return;
default:
{
std::ostringstream oss;
- oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ").";
+ oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ").";
QLS_LOG2(error, _jid, oss.str());
THROW_STORE_FULL_EXCEPTION(oss.str());
}
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index ba62b1d64a..59050e3875 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -41,14 +41,14 @@ namespace qpid{
namespace sys {
class Timer;
}
-namespace qls_jrnl {
-class EmptyFilePool;
-}
namespace linearstore{
class JournalImpl;
class JournalLogImpl;
+namespace journal {
+ class EmptyFilePool;
+}
class InactivityFireEvent : public qpid::sys::TimerTask
{
@@ -74,7 +74,7 @@ class GetEventsFireEvent : public qpid::sys::TimerTask
inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
};
-class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jrnl::jcntl, public qpid::qls_jrnl::aio_callback
+class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback
{
public:
typedef boost::function<void (JournalImpl&)> DeleteCallback;
@@ -110,26 +110,26 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void initManagement(qpid::management::ManagementAgent* agent);
- void initialize(qpid::qls_jrnl::EmptyFilePool* efp,
+ void initialize(qpid::linearstore::journal::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::qls_jrnl::aio_callback* const cbp);
+ qpid::linearstore::journal::aio_callback* const cbp);
- inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp,
+ inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks) {
initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
}
- void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+ void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::qls_jrnl::aio_callback* const cbp,
+ qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id);
- inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+ inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
@@ -142,38 +142,38 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp,
+ const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp,
const bool transient = false);
- void enqueue_extern_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp,
+ void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
const bool transient = false);
void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp, const std::string& xid,
+ const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid,
const bool transient = false);
- void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp,
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- void dequeue_data_record(qpid::qls_jrnl::data_tok* const dtokp, const bool txn_coml_commit = false);
+ void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
- void dequeue_txn_data_record(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+ void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
- void txn_abort(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid);
+ void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
- void txn_commit(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid);
+ void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
- qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false);
+ qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false);
// TimerTask callback
void getEventsFire();
void flushFire();
// AIO callbacks
- virtual void wr_aio_cb(std::vector<qpid::qls_jrnl::data_tok*>& dtokl);
+ virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl);
virtual void rd_aio_cb(std::vector<uint16_t>& pil);
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
@@ -194,7 +194,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
timer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
- void handleIoResult(const qpid::qls_jrnl::iores r);
+ void handleIoResult(const qpid::linearstore::journal::iores r);
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
@@ -222,9 +222,9 @@ class TplJournalImpl : public JournalImpl
virtual ~TplJournalImpl() {}
// Special version of read_data_record that ignores transactions - needed when reading the TPL
- inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
+ inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
- qpid::qls_jrnl::data_tok* const dtokp) {
+ qpid::linearstore::journal::data_tok* const dtokp) {
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
}
}; // class TplJournalImpl
diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
index b7935b1509..85e977595d 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
@@ -24,11 +24,11 @@
namespace qpid {
namespace linearstore {
-JournalLogImpl::JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold) : qpid::qls_jrnl::JournalLog(logLevelThreshold) {}
+JournalLogImpl::JournalLogImpl(const qpid::linearstore::journal::JournalLog::log_level_t logLevelThreshold) : qpid::linearstore::journal::JournalLog(logLevelThreshold) {}
JournalLogImpl::~JournalLogImpl() {}
void
-JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+JournalLogImpl::log(const qpid::linearstore::journal::JournalLog::log_level_t level,
const std::string& log_stmt) const {
switch (level) {
case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: " << log_stmt); break;
@@ -42,7 +42,7 @@ JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
}
void
-JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+JournalLogImpl::log(const qpid::linearstore::journal::JournalLog::log_level_t level,
const std::string& jid,
const std::string& log_stmt) const {
switch (level) {
diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
index 43c2d7dc9c..538b383945 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_LOG_H
-#define QPID_LEGACYSTORE_LOG_H
+#ifndef QPID_LINEARSTORE_LOG_H
+#define QPID_LINEARSTORE_LOG_H
#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/log/Statement.h"
@@ -31,18 +31,18 @@
namespace qpid {
namespace linearstore {
-class JournalLogImpl : public qpid::qls_jrnl::JournalLog
+class JournalLogImpl : public qpid::linearstore::journal::JournalLog
{
public:
- JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold);
+ JournalLogImpl(const qpid::linearstore::journal::JournalLog::log_level_t logLevelThreshold);
virtual ~JournalLogImpl();
- virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+ virtual void log(const qpid::linearstore::journal::JournalLog::log_level_t logLevel,
const std::string& logStatement) const;
- virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+ virtual void log(const qpid::linearstore::journal::JournalLog::log_level_t logLevel,
const std::string& journalId,
const std::string& logStatement) const;
};
}} // namespace qpid::linearstore
-#endif // QPID_LEGACYSTORE_LOG_H
+#endif // QPID_LINEARSTORE_LOG_H
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 100c8925de..3d06adaa67 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
isInit(false),
envPath(envpath_),
broker(broker_),
- jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE),
+ jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE),
mgmtObject(),
agent(0)
{}
@@ -119,13 +119,13 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
}
}
-qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_,
+qpid::linearstore::journal::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition_,
const std::string& /*paramName_*/) {
// TODO: check against list of existing partitions, throw if not found
return partition_;
}
-qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
+qpid::linearstore::journal::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_,
const std::string& paramName_) {
uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB);
if (rem != 0) {
@@ -171,8 +171,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
{
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options_);
- qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
- qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
+ qpid::linearstore::journal::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
+ qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
@@ -182,8 +182,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& storeDir_,
- qpid::qls_jrnl::efpPartitionNumber_t efpPartition_,
- qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_,
+ qpid::linearstore::journal::efpPartitionNumber_t efpPartition_,
+ qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_,
const bool truncateFlag_,
uint32_t wCachePageSizeKib_,
uint32_t tplWCachePageSizeKib_)
@@ -230,7 +230,7 @@ void MessageStoreImpl::init()
}
try {
- qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir());
+ qpid::linearstore::journal::jdir::create_dir(getBdbBaseDir());
dbenv.reset(new DbEnv(0));
dbenv->set_errpfx("linearstore");
@@ -282,7 +282,7 @@ void MessageStoreImpl::init()
THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e);
} catch (const StoreException&) {
throw;
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
QLS_LOG(error, "Journal Exception occurred while initializing store: " << e);
THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what());
} catch (...) {
@@ -291,7 +291,7 @@ void MessageStoreImpl::init()
}
} while (!isInit);
- efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(),
+ efpMgr.reset(new qpid::linearstore::journal::EmptyFilePoolManager(getStoreTopLevelDir(),
defaultEfpPartitionNumber,
defaultEfpFileSize_kib,
jrnlLog));
@@ -337,9 +337,9 @@ void MessageStoreImpl::truncateInit()
// TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
- qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
- qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
- qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated.");
init();
}
@@ -349,7 +349,7 @@ void MessageStoreImpl::chkTplStoreInit()
// Prevent multiple threads from late-initializing the TPL
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
- qpid::qls_jrnl::jdir::create_dir(getTplBaseDir());
+ qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
@@ -379,7 +379,7 @@ MessageStoreImpl::~MessageStoreImpl()
closeDbs();
} catch (const DbException& e) {
QLS_LOG(error, "Error closing BDB databases: " << e.what());
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
QLS_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
QLS_LOG(error, "Error: " << e.what());
@@ -420,7 +420,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
}
try {
@@ -432,28 +432,28 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
}
}
-qpid::qls_jrnl::EmptyFilePool*
-MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_,
- const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) {
- qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
+qpid::linearstore::journal::EmptyFilePool*
+MessageStoreImpl::getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t efpPartitionNumber_,
+ const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_) {
+ qpid::linearstore::journal::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
if (efpp == 0) {
std::ostringstream oss;
oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB";
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool");
}
return efpp;
}
-qpid::qls_jrnl::EmptyFilePool*
+qpid::linearstore::journal::EmptyFilePool*
MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
qpid::framing::FieldTable::ValuePtr value;
- qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
+ qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
value = args_.get("qpid.efp_partition");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
}
- qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
+ qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
value = args_.get("qpid.efp_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
@@ -694,7 +694,6 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
txn_list& prepared,
message_index& messages)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()");
Cursor queues;
queues.open(queueDb, txn.get());
@@ -731,7 +730,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
uint64_t thisHighestRid = 0ULL;
- jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
+ jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
// Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
// from recovery of a store that has had its size changed externally by the resize utility.
@@ -757,7 +756,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
}
//read all messages: done on a per queue basis if using Journal
@@ -869,7 +868,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
long& rcnt,
long& idcnt)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\"");
size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -894,11 +892,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
try {
unsigned aio_sleep_cnt = 0;
while (read) {
- qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
switch (res)
{
- case qpid::qls_jrnl::RHM_IORES_SUCCESS: {
+ case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
msg_count++;
qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
@@ -948,11 +946,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
}
} else {
// Enqueue and/or dequeue tx
- qpid::qls_jrnl::txn_map& tmap = jc->get_txn_map();
- qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
+ qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
- for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->enq_flag_ && j->rid_ == rid) enq = true;
else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
@@ -977,21 +975,21 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
aio_sleep_cnt = 0;
break;
}
- case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
+ case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
::usleep(AIO_SLEEP_TIME_US);
break;
- case qpid::qls_jrnl::RHM_IORES_EMPTY:
+ case qpid::linearstore::journal::RHM_IORES_EMPTY:
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
- oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::qls_jrnl::iores_str(res);
+ oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::linearstore::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
} // while
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
}
@@ -1000,7 +998,7 @@ qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(
uint64_t /*messageId*/,
unsigned& /*headerSize*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
}
int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
@@ -1036,9 +1034,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
void MessageStoreImpl::readTplStore()
{
- QLS_LOG(info, "*** MessageStoreImpl::readTplStore()");
tplRecoverMap.clear();
- qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map();
+ qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
DataTokenImpl dtok;
void* dbuff = NULL; size_t dbuffSize = 0;
void* xidbuff = NULL; size_t xidbuffSize = 0;
@@ -1050,9 +1047,9 @@ void MessageStoreImpl::readTplStore()
while (!done) {
dtok.reset();
dtok.set_wstate(DataTokenImpl::ENQ);
- qpid::qls_jrnl::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
switch (res) {
- case qpid::qls_jrnl::RHM_IORES_SUCCESS: {
+ case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
// Every TPL record contains both data and an XID
assert(dbuffSize>0);
assert(xidbuffSize>0);
@@ -1060,7 +1057,7 @@ void MessageStoreImpl::readTplStore()
bool is2PC = *(static_cast<char*>(dbuff)) != 0;
// Check transaction details; add to recover map
- qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
@@ -1070,7 +1067,7 @@ void MessageStoreImpl::readTplStore()
// Note: will apply to both 1PC and 2PC transactions.
bool commitFlag = true;
- for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->enq_flag_) {
rid = j->rid_;
enqCnt++;
@@ -1088,31 +1085,30 @@ void MessageStoreImpl::readTplStore()
aio_sleep_cnt = 0;
break;
}
- case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
+ case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
::usleep(AIO_SLEEP_TIME_US);
break;
- case qpid::qls_jrnl::RHM_IORES_EMPTY:
+ case qpid::linearstore::journal::RHM_IORES_EMPTY:
done = true;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
- oss << "readTplStore(): Unexpected result from journal read: " << qpid::qls_jrnl::iores_str(res);
+ oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
}
void MessageStoreImpl::recoverTplStore()
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()");
- if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) {
+ if (qpid::linearstore::journal::jdir::exists(tplStorePtr->jrnl_dir())) {
uint64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
+ tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1127,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore()
void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()");
if (!tplStorePtr->is_ready())
recoverTplStore();
@@ -1143,7 +1138,6 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
- QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()");
if (tplStorePtr->is_ready()) {
readTplStore();
} else {
@@ -1158,18 +1152,18 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
}
void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
}
void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
const std::string& /*data*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
}
void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
@@ -1178,7 +1172,7 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue
uint64_t /*offset*/,
uint32_t /*length*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
}
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
@@ -1193,7 +1187,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
// TODO: check if this result should be used...
/*mrg::journal::iores res =*/ jc->flush();
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
@@ -1268,7 +1262,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
@@ -1328,7 +1322,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
ddtokp->release();
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what());
}
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index a136f0ba80..fd2ab603d1 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
-#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
+#ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H
+#define QPID_LINEARSTORE_MESSAGESTOREIMPL_H
#include <iomanip>
#include <string>
@@ -133,8 +133,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
IdSequence generalIdSequence;
IdSequence messageIdSequence;
std::string storeDir;
- qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber;
- qpid::qls_jrnl::efpDataSize_kib_t defaultEfpFileSize_kib;
+ qpid::linearstore::journal::efpPartitionNumber_t defaultEfpPartitionNumber;
+ qpid::linearstore::journal::efpDataSize_kib_t defaultEfpFileSize_kib;
bool truncateFlag;
uint32_t wCachePgSizeSblks;
uint16_t wCacheNumPages;
@@ -145,7 +145,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
const char* envPath;
qpid::broker::Broker* broker;
JournalLogImpl jrnlLog;
- boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpMgr;
+ boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpMgr;
qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -156,9 +156,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
const std::string& paramName/*,
const uint16_t jrnlFsizePgs*/);
static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB);
- static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition,
+ static qpid::linearstore::journal::efpPartitionNumber_t chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition,
const std::string& paramName);
- static qpid::qls_jrnl::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKiB,
+ static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
const std::string& paramName);
void init();
@@ -233,8 +233,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const std::string& queueName);
- qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpDataSize_kib_t s);
- qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
+ qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t p, const qpid::linearstore::journal::efpDataSize_kib_t s);
+ qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
std::string getStoreTopLevelDir();
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
@@ -268,8 +268,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
bool init(const qpid::Options* options);
bool init(const std::string& dir,
- qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition,
- qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
+ qpid::linearstore::journal::efpPartitionNumber_t efpPartition = defEfpPartition,
+ qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
const bool truncateFlag = false,
uint32_t wCachePageSize = defWCachePageSizeKib,
uint32_t tplWCachePageSize = defTplWCachePageSizeKib);
@@ -365,4 +365,4 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
} // namespace msgstore
} // namespace mrg
-#endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
+#endif // ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H
diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
index 9888342579..9e401c2a30 100644
--- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
+++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H
-#define QPID_LEGACYSTORE_PREPAREDTRANSACTION_H
+#ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H
+#define QPID_LINEARSTORE_PREPAREDTRANSACTION_H
#include <list>
#include <map>
@@ -72,4 +72,4 @@ struct PreparedTransaction
}}
-#endif // ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H
+#endif // ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H
diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h
index 9accfd0bd9..5d577d5779 100644
--- a/qpid/cpp/src/qpid/linearstore/StoreException.h
+++ b/qpid/cpp/src/qpid/linearstore/StoreException.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H
-#define QPID_LEGACYSTORE_STOREEXCEPTION_H
+#ifndef QPID_LINEARSTORE_STOREEXCEPTION_H
+#define QPID_LINEARSTORE_STOREEXCEPTION_H
#include "qpid/linearstore/IdDbt.h"
#include <boost/format.hpp>
@@ -53,4 +53,4 @@ public:
}}
-#endif // ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H
+#endif // ifndef QPID_LINEARSTORE_STOREEXCEPTION_H
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 022d9c13e2..945f190608 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -52,7 +52,7 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
} else {
jc->txn_abort(dtokp.get(), getXid());
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
@@ -104,10 +104,10 @@ void TxnCtxt::sync() {
if (preparedXidStorePtr)
jrnl_flush(preparedXidStorePtr);
for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
- jrnl_sync(static_cast<JournalImpl*>(*i), &qpid::qls_jrnl::jcntl::_aio_cmpl_timeout);
+ jrnl_sync(static_cast<JournalImpl*>(*i), &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout);
if (preparedXidStorePtr)
- jrnl_sync(preparedXidStorePtr, &qpid::qls_jrnl::jcntl::_aio_cmpl_timeout);
- } catch (const qpid::qls_jrnl::jexception& e) {
+ jrnl_sync(preparedXidStorePtr, &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout);
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what());
}
}
@@ -122,7 +122,7 @@ void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
if (!jc || jc->is_txn_synced(getXid()))
return;
while (jc->get_wr_aio_evt_rem()) {
- if (jc->get_wr_events(timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT && timeout)
+ if (jc->get_wr_events(timeout) == qpid::linearstore::journal::jerrno::AIO_TIMEOUT && timeout)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()"));
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
index 4c009d7a94..5a4350b128 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_TXNCTXT_H
-#define QPID_LEGACYSTORE_TXNCTXT_H
+#ifndef QPID_LINEARSTORE_TXNCTXT_H
+#define QPID_LINEARSTORE_TXNCTXT_H
#include "db-inc.h"
#include <memory>
@@ -112,6 +112,6 @@ class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
}}
-#endif // ifndef QPID_LEGACYSTORE_TXNCTXT_H
+#endif // ifndef QPID_LINEARSTORE_TXNCTXT_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
index 82d38e716e..7181a43857 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
@@ -19,14 +19,15 @@
*
*/
-#ifndef QPID_LINEARSTORE_ATOMICCOUNTER_H_
-#define QPID_LINEARSTORE_ATOMICCOUNTER_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_
+#define QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_
#include "qpid/linearstore/jrnl/slock.h"
#include <string>
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
template <class T>
class AtomicCounter
@@ -127,6 +128,6 @@ public:
}
};
-}} // namespace qpid::qls_jrnl
+}}} // namespace qpid::qls_jrnl
-#endif // QPID_LINEARSTORE_ATOMICCOUNTER_H_
+#endif // QPID_LINEARSTORE_JOURNAL_ATOMICCOUNTER_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
index 015c2cbafe..da3d63f462 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
@@ -38,7 +38,8 @@
//#include <iostream> // DEBUG
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
@@ -338,4 +339,4 @@ int EmptyFilePool::moveEmptyFile(const std::string& from,
return 0;
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
index cb6887a095..a866d0dea4 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_QLS_JRNL_EMPTYFILEPOOL_H_
-#define QPID_QLS_JRNL_EMPTYFILEPOOL_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_
+#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_
namespace qpid {
-namespace qls_jrnl {
-
+namespace linearstore {
+namespace journal {
class EmptyFilePool;
-
-}} // namespace qpid::qls_jrnl
+}}}
#include <deque>
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
@@ -35,7 +34,8 @@ namespace qls_jrnl {
#include <string>
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class EmptyFilePoolPartition;
class jdir;
class JournalFile;
@@ -92,6 +92,6 @@ protected:
const std::string& toFqPath);
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif /* QPID_QLS_JRNL_EMPTYFILEPOOL_H_ */
+#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOL_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
index 3c2d7d69a2..4b8602318d 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
@@ -31,7 +31,8 @@
//#include <iostream> // DEBUG
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
const efpPartitionNumber_t defaultPartitionNumber,
@@ -92,8 +93,8 @@ void EmptyFilePoolManager::findEfpPartitions() {
}
journalLogRef_.log(JournalLog::LOG_NOTICE, "EFP Manager initialization complete");
- std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
- std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
+ std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> partitionList;
+ std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList;
getEfpPartitions(partitionList);
if (partitionList.size() == 0) {
journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
@@ -101,14 +102,14 @@ void EmptyFilePoolManager::findEfpPartitions() {
std::stringstream oss;
oss << "> EFP Partitions found: " << partitionList.size();
journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
+ for (std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
filePoolList.clear();
(*i)->getEmptyFilePools(filePoolList);
std::stringstream oss;
oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
<< " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
+ for (std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
std::ostringstream oss;
oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
" files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
@@ -208,4 +209,4 @@ uint16_t EmptyFilePoolManager::getNumEfpPartitions() const {
return partitionMap_.size();
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
index d5fab82800..1b1f293a1c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
@@ -27,7 +27,8 @@
#include "qpid/linearstore/jrnl/smutex.h"
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class EmptyFilePoolManager
{
@@ -66,6 +67,6 @@ public:
uint16_t getNumEfpPartitions() const;
};
-}} // namespace qpid::qls_jrnl
+}}}
#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
index d38db60975..2d84ba4296 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
@@ -31,7 +31,8 @@
//#include <iostream> // DEBUG
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
// static
const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
@@ -151,4 +152,4 @@ void EmptyFilePoolPartition::validatePartitionDir() {
// TODO: other validity checks here
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
index a9ca36d07b..9a2b5a5d75 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_
-#define QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_
+#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_
namespace qpid {
-namespace qls_jrnl {
-
+namespace linearstore {
+namespace journal {
class EmptyFilePoolPartition;
-
-}} // namespace qpid::qls_jrnl
+}}}
#include "qpid/linearstore/jrnl/EmptyFilePool.h"
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
@@ -37,7 +36,8 @@ namespace qls_jrnl {
#include <vector>
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class JournalLog;
class EmptyFilePoolPartition
@@ -75,6 +75,6 @@ protected:
void validatePartitionDir();
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ */
+#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
index 2f9693fd95..d8e8225697 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
@@ -19,15 +19,16 @@
*
*/
-#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
-#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
+#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
#include <iostream>
#include <stdint.h>
#include <utility> // std::pair
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib
typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib
@@ -41,9 +42,10 @@ typedef struct efpIdentity_t {
efpDataSize_kib_t ds_;
efpIdentity_t() : pn_(0), ds_(0) {}
efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
+ efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {}
friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
} efpIdentity_t;
-}} // namespace qpid::qls_jrnl
+}}}
-#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ */
+#endif /* QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_ */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
index 3de7b6c1e2..1e6c10eea7 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
@@ -31,18 +31,23 @@
//#include <iostream> // DEBUG
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
JournalFile::JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader) :
+ const efpIdentity_t& efpIdentity,
+ const uint64_t fileSeqNum) :
+ efpIdentity_(efpIdentity),
fqFileName_(fqFileName),
- fileSeqNum_(fileHeader._file_number),
+ fileSeqNum_(fileSeqNum),
+ serial_(getRandom64()),
+ firstRecordOffset_(0ULL),
fileHandle_(-1),
fileCloseFlag_(false),
fileHeaderBasePtr_ (0),
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
- fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -50,16 +55,18 @@ JournalFile::JournalFile(const std::string& fqFileName,
{}
JournalFile::JournalFile(const std::string& fqFileName,
- const uint64_t fileSeqNum,
- const efpDataSize_kib_t efpDataSize_kib) :
+ const ::file_hdr_t& fileHeader) :
+ efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib),
fqFileName_(fqFileName),
- fileSeqNum_(fileSeqNum),
+ fileSeqNum_(fileHeader._file_number),
+ serial_(fileHeader._rhdr._serial),
+ firstRecordOffset_(fileHeader._fro),
fileHandle_(-1),
fileCloseFlag_(false),
fileHeaderBasePtr_ (0),
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
- fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -108,6 +115,10 @@ uint64_t JournalFile::getFileSeqNum() const {
return fileSeqNum_;
}
+uint64_t JournalFile::getSerial() const {
+ return serial_;
+}
+
int JournalFile::open() {
fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
if (fileHandle_ < 0) {
@@ -141,10 +152,12 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
const uint64_t recordId,
const uint64_t firstRecordOffset,
const std::string queueName) {
+ firstRecordOffset_ = firstRecordOffset;
::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
::file_hdr_init(fileHeaderBasePtr_,
QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
userFlags,
+ serial_,
recordId,
firstRecordOffset,
fileSeqNum_,
@@ -208,6 +221,18 @@ uint16_t JournalFile::decrOutstandingAioOperationCount() {
return r;
}
+efpIdentity_t JournalFile::getEfpIdentity() const {
+ return efpIdentity_;
+}
+
+uint64_t JournalFile::getFirstRecordOffset() const {
+ return firstRecordOffset_;
+}
+
+void JournalFile::setFirstRecordOffset(const uint64_t firstRecordOffset) {
+ firstRecordOffset_ = firstRecordOffset;
+}
+
// --- Status helper functions ---
bool JournalFile::isEmpty() const {
@@ -253,6 +278,22 @@ const std::string JournalFile::getFileName() const {
return fqFileName_.substr(fqFileName_.rfind('/')+1);
}
+//static
+uint64_t JournalFile::getRandom64() {
+ int randomData = ::open("/dev/random", O_RDONLY);
+ if (randomData < 0) {
+ throw jexception(); // TODO: Complete exception details
+ }
+ uint64_t randomNumber;
+ ::size_t size = sizeof(randomNumber);
+ ::ssize_t result = ::read(randomData, (char*)&randomNumber, size);
+ if (result < 0 || result != ssize_t(size)) {
+ throw jexception(); // TODO: Complete exception details
+ }
+ ::close(randomData);
+ return randomNumber;
+}
+
bool JournalFile::isOpen() const {
return fileHandle_ >= 0;
}
@@ -298,4 +339,4 @@ bool JournalFile::isFullAndComplete() const {
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
index 0abbc7bcca..687b2c4866 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LINEARSTORE_JOURNALFILE_H_
-#define QPID_LINEARSTORE_JOURNALFILE_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_
+#define QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_
#include "qpid/linearstore/jrnl/aio.h"
#include "qpid/linearstore/jrnl/AtomicCounter.h"
@@ -31,13 +31,17 @@
class file_hdr_t;
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class JournalFile
{
protected:
+ const efpIdentity_t efpIdentity_;
const std::string fqFileName_;
const uint64_t fileSeqNum_;
+ const uint64_t serial_;
+ uint64_t firstRecordOffset_;
int fileHandle_;
bool fileCloseFlag_;
void* fileHeaderBasePtr_;
@@ -51,11 +55,13 @@ protected:
AtomicCounter<uint16_t> outstandingAioOpsCount_; ///< Outstanding AIO operations on this file
public:
+ // Constructor for creating new file with known fileSeqNum and random serial
JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader);
+ const efpIdentity_t& efpIdentity,
+ const uint64_t fileSeqNum);
+ // Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param
JournalFile(const std::string& fqFileName,
- const uint64_t fileSeqNum,
- const efpDataSize_kib_t efpDataSize_kib);
+ const ::file_hdr_t& fileHeader);
virtual ~JournalFile();
void initialize(const uint32_t completedDblkCount);
@@ -63,6 +69,7 @@ public:
const std::string getFqFileName() const;
uint64_t getFileSeqNum() const;
+ uint64_t getSerial() const;
int open();
void close();
@@ -87,6 +94,10 @@ public:
uint16_t getOutstandingAioOperationCount() const;
uint16_t decrOutstandingAioOperationCount();
+ efpIdentity_t getEfpIdentity() const;
+ uint64_t getFirstRecordOffset() const;
+ void setFirstRecordOffset(const uint64_t firstRecordOffset);
+
// Status helper functions
bool isEmpty() const; ///< True if no writes of any kind have occurred
bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
@@ -97,6 +108,7 @@ public:
protected:
const std::string getDirectory() const;
const std::string getFileName() const;
+ static uint64_t getRandom64();
bool isOpen() const;
uint32_t getSubmittedDblkCount() const;
@@ -114,6 +126,6 @@ protected:
bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif // QPID_LINEARSTORE_JOURNALFILE_H_
+#endif // QPID_LINEARSTORE_JOURNAL_JOURNALFILE_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
index 15222e3a47..96a6432624 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
@@ -23,7 +23,8 @@
#include <iostream>
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
JournalLog::JournalLog(log_level_t logLevelThreshold) : logLevelThreshold_(logLevelThreshold) {}
@@ -58,4 +59,4 @@ const char* JournalLog::log_level_str(log_level_t logLevel) {
return "<log level unknown>";
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
index b5fcdbc256..cf503cb9d2 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
@@ -19,13 +19,14 @@
*
*/
-#ifndef QPID_LINEARSTORE_JOURNALLOG_H_
-#define QPID_LINEARSTORE_JOURNALLOG_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_
+#define QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_
#include <string>
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class JournalLog
{
@@ -54,6 +55,6 @@ public:
static const char* log_level_str(const log_level_t logLevel);
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif // QPID_LINEARSTORE_JOURNALLOG_H_
+#endif // QPID_LINEARSTORE_JOURNAL_JOURNALLOG_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
index 2c1327d645..9daebaba80 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
@@ -32,7 +32,8 @@
//#include <iostream> // DEBUG
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
LinearFileController::LinearFileController(jcntl& jcntlRef) :
jcntlRef_(jcntlRef),
@@ -90,7 +91,7 @@ void LinearFileController::pullEmptyFileFromEfp() {
currentJournalFilePtr_->close();
std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
- addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
+ addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
}
void LinearFileController::purgeEmptyFilesToEfp() {
@@ -149,6 +150,11 @@ uint64_t LinearFileController::getCurrentFileSeqNum() const {
return currentJournalFilePtr_->getFileSeqNum();
}
+uint64_t LinearFileController::getCurrentSerial() const {
+ assertCurrentJournalFileValid("getCurrentSerial");
+ return currentJournalFilePtr_->getSerial();
+}
+
bool LinearFileController::isEmpty() const {
assertCurrentJournalFileValid("isEmpty");
return currentJournalFilePtr_->isEmpty();
@@ -173,10 +179,10 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const
// --- protected functions ---
void LinearFileController::addJournalFile(const std::string& fileName,
+ const efpIdentity_t& efpIdentity,
const uint64_t fileNumber,
- const uint32_t fileSize_kib,
const uint32_t completedDblkCount) {
- JournalFile* jfp = new JournalFile(fileName, fileNumber, fileSize_kib);
+ JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber);
addJournalFile(jfp, completedDblkCount);
}
@@ -219,4 +225,4 @@ void LinearFileController::purgeEmptyFilesToEfpNoLock() {
}
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
index 83d360dad8..1e8d9c9200 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
-#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_
+#define QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_
#include <deque>
#include "qpid/linearstore/jrnl/AtomicCounter.h"
@@ -31,7 +31,8 @@ typedef struct io_context* io_context_t;
typedef struct iocb aio_cb;
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class EmptyFilePool;
class jcntl;
@@ -90,6 +91,7 @@ public:
uint32_t dataSize_dblks);
uint64_t getCurrentFileSeqNum() const;
+ uint64_t getCurrentSerial() const;
bool isEmpty() const;
// Debug aid
@@ -97,8 +99,8 @@ public:
protected:
void addJournalFile(const std::string& fileName,
+ const efpIdentity_t& efpIdentity,
const uint64_t fileNumber,
- const uint32_t fileSize_kib,
const uint32_t completedDblkCount);
void assertCurrentJournalFileValid(const char* const functionName) const;
bool checkCurrentJournalFileValid() const;
@@ -110,6 +112,6 @@ protected:
typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
const uint32_t completedDblkCount);
-}} // namespace qpid::qls_jrnl
+}}}
-#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+#endif // QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
index cba560750b..361ee1aeda 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
@@ -44,8 +44,8 @@
#include <vector>
namespace qpid {
-namespace qls_jrnl
-{
+namespace linearstore {
+namespace journal {
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
@@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ currentSerial_(0),
efpFileSize_kib_(0)
{}
@@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
enq_map::emap_data_struct_t eds;
enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- uint64_t fileNumber = eds._pfid;
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- getNextFile(false);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
+ getFile(eds._pfid, false);
+ }
+//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
if (!inFileStream_.good()) {
@@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-
+//char magicBuff[5]; // DEBUG
+//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
+//magicBuff[4] = 0; // DEBUG
+//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
}
}
-std::string RecoveryManager::toString(const std::string& jid,
- bool compact) {
+std::string RecoveryManager::toString(const std::string& jid) {
std::ostringstream oss;
- if (compact) {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
- oss << " jfl=[";
- for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
- if (i!=fileNumberMap_.begin()) {
- oss << " ";
- }
- std::string fqFileName = i->second->getFqFileName();
- oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
- }
- oss << "] ecl=[ ";
- for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
- if (j!=fileNumberMap_.begin()) {
- oss << " ";
- }
- oss << j->second->getEnqueuedRecordCount();
+ oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+ oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
+ oss << " Journal File List:" << std::endl;
+ for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+ std::string fqFileName = k->second->getFqFileName();
+ oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+ }
+ oss << " Enqueue Counts: [ ";
+ for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+ if (l != fileNumberMap_.begin()) {
+ oss << ", ";
}
- oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
- oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
- oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
- oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+ oss << l->second->getEnqueuedRecordCount();
+ }
+ oss << " ]" << std::endl;
+ oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+ oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " Enqueued records (txn & non-txn):" << std::endl;
+ return oss.str();
+}
+
+std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+ std::string indentStr(indent, ' ');
+ std::ostringstream oss;
+ oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
+ if (journalEmptyFlag_) {
+ oss << indentStr << "<Journal empty, no journal files found>" << std::endl;
} else {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
- oss << " Journal File List:" << std::endl;
+ oss << indentStr << std::setw(7) << "file_id"
+ << std::setw(43) << "file_name"
+ << std::setw(16) << "fro"
+ << std::setw(12) << "record_cnt"
+ << std::setw(5) << "ptn"
+ << std::setw(10) << "efp"
+ << std::endl;
+ oss << indentStr << std::setw(7) << "-------"
+ << std::setw(43) << "-----------------------------------------"
+ << std::setw(16) << "--------------"
+ << std::setw(12) << "----------"
+ << std::setw(5) << "---"
+ << std::setw(10) << "--------"
+ << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
std::string fqFileName = k->second->getFqFileName();
- oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
- }
- oss << " Enqueue Counts: [ " << std::endl;
- for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
- if (l != fileNumberMap_.begin()) {
- oss << ", ";
- }
- oss << l->second->getEnqueuedRecordCount();
+ std::ostringstream fro;
+ fro << std::hex << "0x" << k->second->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << k->first
+ << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
+ << std::setw(16) << fro.str()
+ << std::setw(12) << k->second->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->getEfpIdentity().pn_
+ << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::endl;
}
- oss << " ]" << std::endl;
- oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" <<
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
- oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
- oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " Enqueued records (txn & non-txn):" << std::endl;
+ oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << indentStr << "Enqueued records (txn & non-txn):";
}
return oss.str();
}
@@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
}
bool RecoveryManager::decodeRecord(jrec& record,
- std::size_t& cumulativeSizeRead,
- ::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& headerRecord,
+ std::streampos& fileOffset)
{
std::streampos start_file_offs = fileOffset;
@@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec& record,
bool done = false;
while (!done) {
try {
- done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
checkJournalAlignment(start_file_offs);
return false;
}
- if (!done && !getNextFile(false)) {
- checkJournalAlignment(start_file_offs);
- return false;
+ if (!done && needNextFile()) {
+ if (!getNextFile(false)) {
+ checkJournalAlignment(start_file_offs);
+ return false;
+ }
}
}
return true;
@@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFileNumber() const {
return currentJournalFileConstItr_->first;
}
-bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
- if (inFileStream_.eof() || !inFileStream_.good()) {
- inFileStream_.clear();
- endOffset_ = inFileStream_.tellg(); // remember file offset before closing
- if (endOffset_ == -1) {
- std::ostringstream oss;
- oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
- throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
- }
- inFileStream_.close();
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
- return false;
- }
- }
- }
- if (!inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
- inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
- if (!inFileStream_.good()) {
- throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
- }
+ }
+ currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ return false;
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile");
+ }
+//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG
- // Read file header
- file_hdr_t fhdr;
- inFileStream_.read((char*)&fhdr, sizeof(fhdr));
- checkFileStreamOk(true);
- if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
- firstRecordOffset_ = fhdr._fro;
- std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
- inFileStream_.seekg(foffs);
- } else {
- inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
- journalEmptyFlag_ = true;
- }
+ if (!readFileHeader()) {
+ return false;
+ }
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
+ return true;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
+ if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
return false;
}
+ inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+ }
+//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG
+
+ if (!readFileHeader()) {
+ return false;
}
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
return true;
}
@@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeader()
bool hdr_ok = false;
std::streampos file_pos;
while (!hdr_ok) {
- if (!inFileStream_.is_open()) {
+ if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
@@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeader()
if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
hdr_ok = true;
} else {
- if (!getNextFile(true)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(true)) {
+ return false;
+ }
}
}
}
@@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
enq_rec er;
uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(er, cum_size_read, h, file_pos)) {
@@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_DEQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
deq_rec dr;
uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_TXA_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec ar;
if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
return false;
@@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_TXC_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec cr;
if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
return false;
@@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeader()
uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
- if (!getNextFile(false)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(false)) {
+ return false;
+ }
}
}
break;
@@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeader()
return true;
}
+bool RecoveryManager::needNextFile() {
+ if (inFileStream_.is_open()) {
+ return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
+ }
+ return true;
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
@@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(char* target,
inFileStream_.read(target + bytesRead, readSize - bytesRead);
std::streamoff thisReadSize = inFileStream_.gcount();
if (thisReadSize < readSize) {
- getNextFile(false);
+ if (needNextFile()) {
+ getNextFile(false);
+ }
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(char* target,
}
}
+bool RecoveryManager::readFileHeader() {
+ file_hdr_t fhdr;
+ inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+ checkFileStreamOk(true);
+ if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) {
+ firstRecordOffset_ = fhdr._fro;
+ currentSerial_ = fhdr._rhdr._serial;
+ } else {
+ inFileStream_.close();
+ if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ journalEmptyFlag_ = true;
+ }
+ return false;
+ }
+ return true;
+}
+
// static private
void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
::file_hdr_t& fileHeaderRef,
@@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
}
}
-}} // namespace qpid::qls_jrnl
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
index 3cacd7cfb3..7655eb6831 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
-#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
#include <deque>
#include <fstream>
@@ -33,7 +33,8 @@ struct file_hdr_t;
struct rec_hdr_t;
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class data_tok;
class enq_map;
@@ -72,6 +73,7 @@ protected:
bool lastFileFullFlag_; ///< Last file is full
// State for recovery of individual enqueued records
+ uint64_t currentSerial_;
uint32_t efpFileSize_kib_;
fileNumberMapConstItr_t currentJournalFileConstItr_;
std::string currentFileName_;
@@ -104,8 +106,8 @@ public:
bool ignore_pending_txns);
void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr);
- std::string toString(const std::string& jid,
- bool compact = true);
+ std::string toString(const std::string& jid);
+ std::string toLog(const std::string& jid, const int indent);
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
@@ -116,8 +118,11 @@ protected:
std::streampos& fileOffset);
std::string getCurrentFileName() const;
uint64_t getCurrentFileNumber() const;
+ bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
+ bool needNextFile();
+ bool readFileHeader();
void readJournalData(char* target, const std::streamsize size);
void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
@@ -126,6 +131,6 @@ protected:
std::string& queueName);
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_
+#endif // QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/aio.h b/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
index 7be5b83930..3a4a762439 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
-#define QPID_LEGACYSTORE_JRNL_AIO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_H
#include <libaio.h>
#include <cstring>
#include <string.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
typedef iocb aio_cb;
typedef io_event aio_event;
@@ -135,6 +134,6 @@ public:
}
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h b/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
index 975d114f1f..f21b62617b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
@@ -19,27 +19,26 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
-#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
#include <stdint.h>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
- class data_tok;
+class data_tok;
- class aio_callback
- {
- public:
- virtual ~aio_callback() {}
- virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
- virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
- };
+class aio_callback
+{
+public:
+ virtual ~aio_callback() {}
+ virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+ virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h b/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
index c22dcb8c1c..243c817946 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
-#define QPID_LEGACYSTORE_JRNL_CVAR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
+#define QPID_LINEARSTORE_JOURNAL_CVAR_H
#include <cstring>
#include "qpid/linearstore/jrnl/jerrno.h"
@@ -30,10 +30,9 @@
#include <pthread.h>
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple thread condition variable class
class cvar
@@ -71,6 +70,6 @@ namespace qls_jrnl
}
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
index ab2a763f17..a07a89b02a 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
@@ -27,10 +27,9 @@
#include "qpid/linearstore/jrnl/slock.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Static members
@@ -39,10 +38,8 @@ smutex data_tok::_mutex;
data_tok::data_tok():
_wstate(NONE),
-// _rstate(UNREAD),
_dsize(0),
_dblks_written(0),
-// _dblks_read(0),
_pg_cnt(0),
_fid(0),
_rid(0),
@@ -106,58 +103,12 @@ data_tok::wstate_str(write_state wstate)
return "<wstate unknown>";
}
-/*
-const char*
-data_tok::rstate_str() const
-{
- return rstate_str(_rstate);
-}
-*/
-
-/*
-const char*
-data_tok::rstate_str(read_state rstate)
-{
- switch (rstate)
- {
- case NONE:
- return "NONE";
- case READ_PART:
- return "READ_PART";
- case SKIP_PART:
- return "SKIP_PART";
- case READ:
- return "READ";
- // Not using default: forces compiler to ensure all cases are covered.
- }
- return "<rstate unknown>";
-}
-*/
-
-/*
-void
-data_tok::set_rstate(const read_state rstate)
-{
- if (_wstate != ENQ && rstate != UNREAD)
- {
- std::ostringstream oss;
- oss << "Attempted to change read state to " << rstate_str(rstate);
- oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
- throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
- "set_rstate");
- }
- _rstate = rstate;
-}
-*/
-
void
data_tok::reset()
{
_wstate = NONE;
-// _rstate = UNREAD;
_dsize = 0;
_dblks_written = 0;
-// _dblks_read = 0;
_pg_cnt = 0;
_fid = 0;
_rid = 0;
@@ -185,4 +136,4 @@ data_tok::status_str() const
return oss.str();
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
index 39dc1c4a81..f727243cb0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
-#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
+#define QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class data_tok;
-}}
+}}}
#include <cassert>
#include <cstddef>
@@ -35,10 +34,9 @@ class data_tok;
#include <pthread.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class data_tok
@@ -72,25 +70,13 @@ namespace qls_jrnl
COMMITTED
};
-/*
- enum read_state
- {
- UNREAD, ///< Data block not read
- READ_PART, ///< Data block is part-read; waiting for page buffer to fill
- SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
- READ ///< Data block is fully read
- };
-*/
-
protected:
static smutex _mutex;
static uint64_t _cnt;
uint64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
-// read_state _rstate; ///< Read state of data
std::size_t _dsize; ///< Data size in bytes
uint32_t _dblks_written; ///< Data blocks read/written
-// uint32_t _dblks_read; ///< Data blocks read/written
uint32_t _pg_cnt; ///< Page counter - incr for each page containing part of data
uint64_t _fid; ///< FID containing header of enqueue record
uint64_t _rid; ///< RID of data set by enqueue operation
@@ -106,16 +92,11 @@ namespace qls_jrnl
inline write_state wstate() const { return _wstate; }
const char* wstate_str() const;
static const char* wstate_str(write_state wstate);
-// inline read_state rstate() const { return _rstate; }
-// const char* rstate_str() const;
-// static const char* rstate_str(read_state rstate);
inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
inline bool is_enqueued() const { return _wstate == ENQ; }
inline bool is_readable() const { return _wstate == ENQ; }
-// inline bool is_read() const { return _rstate == READ; }
inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
inline void set_wstate(const write_state wstate) { _wstate = wstate; }
-// void set_rstate(const read_state rstate);
inline std::size_t dsize() const { return _dsize; }
inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
@@ -124,10 +105,6 @@ namespace qls_jrnl
{ _dblks_written += dblks_written; }
inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
-// inline uint32_t dblocks_read() const { return _dblks_read; }
-// inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
-// inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
-
inline uint32_t pg_cnt() const { return _pg_cnt; }
inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
@@ -154,7 +131,6 @@ namespace qls_jrnl
std::string status_str() const;
};
-} // namespace qls_jrnl
-} // namespace jrnl
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
index 5b251b7908..ca8de61607 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
@@ -19,87 +19,43 @@
*
*/
-/**
- * \file deq_rec.cpp
- *
- * Qpid asynchronous store plugin library
- *
- * This file contains the code for the mrg::journal::deq_rec (journal dequeue
- * record) class. See comments in file deq_rec.h for details.
- *
- * \author Kim van der Riet
- */
-
#include "qpid/linearstore/jrnl/deq_rec.h"
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
-#include "qpid/linearstore/jrnl/utils/rec_tail.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
deq_rec::deq_rec():
-// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
_xidp(0),
_buff(0)
-// _deq_tail(_deq_hdr)
{
- ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
}
-deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit):
-// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
- _xidp(xidp),
- _buff(0)
-// _deq_tail(_deq_hdr)
-{
- ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
- ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
- ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
-}
-
deq_rec::~deq_rec()
{
clean();
}
void
-deq_rec::reset()
-{
- _deq_hdr._rhdr._rid = 0;
-// _deq_hdr.set_owi(false);
- ::set_txn_coml_commit(&_deq_hdr, false);
- _deq_hdr._deq_rid = 0;
- _deq_hdr._xidsize = 0;
- _deq_tail._checksum = 0;
- _deq_tail._rid = 0;
- _xidp = 0;
- _buff = 0;
-}
-
-void
-deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit)
+deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit)
{
+ _deq_hdr._rhdr._serial = serial;
_deq_hdr._rhdr._rid = rid;
::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
- _deq_tail._rid = rid;
_xidp = xidp;
_buff = 0;
+ _deq_tail._serial = serial;
+ _deq_tail._rid = rid;
}
uint32_t
@@ -214,132 +170,16 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
return size_dblks(wr_cnt);
}
-uint32_t
-deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
- sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-
- if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page
- if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
- {
- // Part of xid still outstanding, copy remainder of xid and tail
- const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
- const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt = xid_rem;
- std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
- chk_tail();
- rd_cnt += sizeof(_deq_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page, tail split
- const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
- const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_deq_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
- _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
- rd_cnt += sizeof(uint64_t);
- _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(deq_hdr_t);
- chk_hdr();
- if (_deq_hdr._xidsize)
- {
- _buff = std::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
- sizeof(rec_tail_t));
-
- // Check if record (header + xid + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_xid_tail_dblks <= max_size_dblks)
- {
- // Entire header, xid and tail fits within this page
- std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
- rd_cnt += _deq_hdr._xidsize;
- std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
- rd_cnt += sizeof(_deq_tail);
- chk_tail();
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Entire header and xid fit within this page, tail split
- std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
- rd_cnt += _deq_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
//_deq_hdr.hdr_copy(h);
::rec_hdr_copy(&_deq_hdr._rhdr, &h);
- ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
- ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid));
+ ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize));
rec_offs = sizeof(_deq_hdr);
// Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
@@ -382,9 +222,21 @@ deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- if (_deq_hdr._xidsize)
- chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
+ if (_deq_hdr._xidsize) {
+ int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
+ }
+ }
return true;
}
@@ -427,38 +279,9 @@ deq_rec::rec_size() const
}
void
-deq_rec::chk_hdr() const
-{
- jrec::chk_hdr(_deq_hdr._rhdr);
- if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
- oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
- oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
- }
-}
-
-void
-deq_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_deq_hdr._rhdr, rid);
-}
-
-void
-deq_rec::chk_tail() const
-{
- jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
-}
-
-void
deq_rec::clean()
{
// clean up allocated memory here
}
-} // namespace journal
-} // namespace mrg
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
index 12ba6af855..b4941895be 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
@@ -19,73 +19,51 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
-#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
+#define QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class deq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
- /**
- * \class deq_rec
- * \brief Class to handle a single journal dequeue record.
- */
- class deq_rec : public jrec
- {
- private:
- deq_hdr_t _deq_hdr; ///< Dequeue header
- const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _deq_tail; ///< Record tail, only encoded if XID is present
+/**
+* \class deq_rec
+* \brief Class to handle a single journal dequeue record.
+*/
+class deq_rec : public jrec
+{
+private:
+ ::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
- public:
- // constructor used for read operations and xid will have memory allocated
- deq_rec();
- // constructor used for write operations, where xid already exists
- deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit);
- virtual ~deq_rec();
+public:
+ deq_rec();
+ virtual ~deq_rec();
- // Prepare instance for use in reading data from journal
- void reset();
- // Prepare instance for use in writing data to journal
- void reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+ void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
- inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
- inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
- inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
- std::size_t get_xid(void** const xidpp);
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return 0; } // This record never carries data
- std::size_t xid_size() const;
- std::size_t rec_size() const;
+ inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
+ inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
+ inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
- private:
- virtual void chk_hdr() const;
- virtual void chk_hdr(uint64_t rid) const;
- virtual void chk_tail() const;
- virtual void clean();
- }; // class deq_rec
+private:
+ virtual void clean();
+};
-} // namespace journal
-} // namespace mrg
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
index 3bc49502c8..010968bdf7 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
@@ -27,10 +27,9 @@
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// static return/error codes
int16_t enq_map::EMAP_DUP_RID = -3;
@@ -183,4 +182,4 @@ enq_map::pfid_list(std::vector<uint64_t>& fv)
}
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
index 659e3bd052..aff839a022 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class enq_map;
-}}
+}}}
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/smutex.h"
@@ -35,10 +34,9 @@ class enq_map;
#include <pthread.h>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class enq_map
@@ -107,6 +105,6 @@ namespace qls_jrnl
void pfid_list(std::vector<uint64_t>& fv);
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
index e968320ac6..8096d309f9 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
@@ -22,45 +22,22 @@
#include "qpid/linearstore/jrnl/enq_rec.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
-// Constructor used for read operations, where buf contains preallocated space to receive data.
enq_rec::enq_rec():
jrec(), // superclass
- //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false),
_xidp(0),
_data(0),
_buff(0)
- //_enq_tail(_enq_hdr)
-{
- ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false);
- ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
-}
-
-// Constructor used for transactional write operations, where dbuf contains data to be written.
-enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient):
- jrec(), // superclass
- //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient),
- _xidp(xidp),
- _data(dbuf),
- _buff(0)
- //_enq_tail(_enq_hdr)
{
- ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen);
+ ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
- ::set_enq_transient(&_enq_hdr, transient);
}
enq_rec::~enq_rec()
@@ -68,28 +45,11 @@ enq_rec::~enq_rec()
clean();
}
-// Prepare instance for use in reading data from journal, where buf contains preallocated space
-// to receive data.
-void
-enq_rec::reset()
-{
- _enq_hdr._rhdr._rid = 0;
- ::set_enq_transient(&_enq_hdr, false);
- _enq_hdr._xidsize = 0;
- _enq_hdr._dsize = 0;
- _xidp = 0;
- _data = 0;
- _buff = 0;
- _enq_tail._rid = 0;
-}
-
-// Prepare instance for use in writing transactional data to journal, where dbuf contains data to
-// be written.
void
-enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient,
- const bool external)
+enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient, const bool external)
{
+ _enq_hdr._rhdr._serial = serial;
_enq_hdr._rhdr._rid = rid;
::set_enq_transient(&_enq_hdr, transient);
::set_enq_external(&_enq_hdr, external);
@@ -98,6 +58,7 @@ enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dle
_xidp = xidp;
_data = dbuf;
_buff = 0;
+ _enq_tail._serial = serial;
_enq_tail._rid = rid;
}
@@ -246,217 +207,19 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
return size_dblks(wr_cnt);
}
-uint32_t
-enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize +
- (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
- const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
- const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
- const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
- const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
-
- if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of record fits within this page
- if (offs < _enq_hdr._xidsize)
- {
- // some XID still outstanding, copy remainder of XID, data and tail
- const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
- std::memcpy((char*)_buff + offs, rptr, rem);
- rd_cnt += rem;
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- // some data still outstanding, copy remainder of data and tail
- const std::size_t data_offs = offs - _enq_hdr._xidsize;
- const std::size_t data_rem = _enq_hdr._dsize - data_offs;
- std::memcpy((char*)_buff + offs, rptr, data_rem);
- rd_cnt += data_rem;
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize -
- _enq_hdr._dsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid & data fits within this page; tail split
-
- /*
- * TODO: This section needs revision. Since it is known that the end of the page falls within the
- * tail record, it is only necessary to write from the current offset to the end of the page under
- * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy()
- * operation.
- *
- * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if
- * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in
- * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and
- * end of a page both fall within the same tail record, in which case the tail would have to be
- * (much!) larger. However, the logic here does not account for this possibility.
- *
- * If the optimization above is undertaken, this code would probably be removed.
- */
- if (offs < _enq_hdr._xidsize) // 1
- {
- // some XID still outstanding, copy remainder of XID and data
- const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
- std::memcpy((char*)_buff + offs, rptr, rem);
- rd_cnt += rem;
- }
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2
- {
- // some data still outstanding, copy remainder of data
- const std::size_t data_offs = offs - _enq_hdr._xidsize;
- const std::size_t data_rem = _enq_hdr._dsize - data_offs;
- std::memcpy((char*)_buff + offs, rptr, data_rem);
- rd_cnt += data_rem;
- }
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Since xid and data are contiguous, both fit within current page - copy whole page
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + offs, rptr, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_enq_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
- _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt += sizeof(std::size_t);
-#if defined(JRNL_32_BIT)
- rd_cnt += sizeof(uint32_t); // Filler 0
-#endif
- _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(enq_hdr_t);
- chk_hdr();
- if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize))
- {
- _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize));
- MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
-
- const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize;
- const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
- const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
- const uint32_t hdr_xid_dblks = size_dblks(hdr_xid_size);
- const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
- const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
- // Check if record (header + data + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_tail_dblks <= max_size_dblks)
- {
- // Header, xid, data and tail fits within this page
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
- _enq_hdr._dsize);
- rd_cnt += _enq_hdr._dsize;
- }
- std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else if (hdr_data_dblks <= max_size_dblks)
- {
- // Header, xid and data fit within this page, tail split or separated
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
- _enq_hdr._dsize);
- rd_cnt += _enq_hdr._dsize;
- }
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Header and xid fits within this page, data split or separated
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- else
- {
- // Header fits within this page, xid split or separated
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate (if req'd) for xid
//_enq_hdr.hdr_copy(h);
::rec_hdr_copy(&_enq_hdr._rhdr, &h);
- ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
- ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler1
-#endif
+ ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize));
+ ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize));
rec_offs = sizeof(_enq_hdr);
- if (_enq_hdr._xidsize)
+ if (_enq_hdr._xidsize > 0)
{
_buff = std::malloc(_enq_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
@@ -517,8 +280,19 @@ enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
+ int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+ }
return true;
}
@@ -578,44 +352,9 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool
}
void
-enq_rec::set_rid(const uint64_t rid)
-{
- _enq_hdr._rhdr._rid = rid;
- _enq_tail._rid = rid;
-}
-
-void
-enq_rec::chk_hdr() const
-{
- jrec::chk_hdr(_enq_hdr._rhdr);
- if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid;
- oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC;
- oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr");
- }
-}
-
-void
-enq_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_enq_hdr._rhdr, rid);
-}
-
-void
-enq_rec::chk_tail() const
-{
- jrec::chk_tail(_enq_tail, _enq_hdr._rhdr);
-}
-
-void
enq_rec::clean()
{
// clean up allocated memory here
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
index b3502d9ad6..24ed9e99e1 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
@@ -19,86 +19,54 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class enq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
-
- /**
- * \class enq_rec
- * \brief Class to handle a single journal enqueue record.
- */
- class enq_rec : public jrec
- {
- private:
- enq_hdr_t _enq_hdr;
- const void* _xidp; ///< xid pointer for encoding (for writing to disk)
- const void* _data; ///< Pointer to data to be written to disk
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _enq_tail;
-
- public:
- /**
- * \brief Constructor used for read operations.
- */
- enq_rec();
-
- /**
- * \brief Constructor used for write operations, where mbuf contains data to be written.
- */
- enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient);
+namespace qpid {
+namespace linearstore {
+namespace journal {
- /**
- * \brief Destructor
- */
- virtual ~enq_rec();
+/**
+* \class enq_rec
+* \brief Class to handle a single journal enqueue record.
+*/
+class enq_rec : public jrec
+{
+private:
+ ::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct
+ const void* _xidp; ///< xid pointer for encoding (for writing to disk)
+ const void* _data; ///< Pointer to data to be written to disk
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
- // Prepare instance for use in reading data from journal, xid and data will be allocated
- void reset();
- // Prepare instance for use in writing data to journal
- void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient,
- const bool external);
+public:
+ enq_rec();
+ virtual ~enq_rec();
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+ void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
- std::size_t get_xid(void** const xidpp);
- std::size_t get_data(void** const datapp);
- inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
- inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return _enq_hdr._dsize; }
- inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
- std::size_t rec_size() const;
- static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
- inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
- void set_rid(const uint64_t rid);
+ std::size_t get_xid(void** const xidpp);
+ std::size_t get_data(void** const datapp);
+ inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
+ inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return _enq_hdr._dsize; }
+ inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
+ std::size_t rec_size() const;
+ static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
+ inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
- private:
- void chk_hdr() const;
- void chk_hdr(uint64_t rid) const;
- void chk_tail() const;
- virtual void clean();
- }; // class enq_rec
+private:
+ virtual void clean();
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
index 511a2a41ab..106f58cf5f 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
@@ -19,13 +19,12 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
-#define QPID_LINEARSTORE_JRNL_ENUMS_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H
+#define QPID_LINEARSTORE_JOURNAL_ENUMS_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// TODO: Change this to flags, as multiple of these conditions may exist simultaneously
/**
@@ -37,12 +36,7 @@ namespace qls_jrnl
RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
-// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized)
-// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
-// RHM_IORES_FULL, ///< During write operations, the journal files are full.
-// RHM_IORES_BUSY, ///< Another blocking operation is in progress.
RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
-// RHM_IORES_NOTIMPL ///< Function is not implemented.
};
typedef _iores iores;
@@ -54,46 +48,11 @@ namespace qls_jrnl
case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
-// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
-// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
-// case RHM_IORES_FULL: return "RHM_IORES_FULL";
-// case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
-// case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
}
return "<iores unknown>";
}
-/*
- enum _log_level
- {
- LOG_TRACE = 0,
- LOG_DEBUG,
- LOG_INFO,
- LOG_NOTICE,
- LOG_WARN,
- LOG_ERROR,
- LOG_CRITICAL
- };
- typedef _log_level log_level_t;
-
- static inline const char* log_level_str(log_level_t ll)
- {
- switch (ll)
- {
- case LOG_TRACE: return "TRACE";
- case LOG_DEBUG: return "DEBUG";
- case LOG_INFO: return "INFO";
- case LOG_NOTICE: return "NOTICE";
- case LOG_WARN: return "WARN";
- case LOG_ERROR: return "ERROR";
- case LOG_CRITICAL: return "CRITICAL";
- }
- return "<log level unknown>";
- }
-*/
-
-
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
index 432887e577..83c61bcb5f 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
-#define QPID_LEGACYSTORE_JRNL_JCFG_H
+#ifndef QPID_QLS_JRNL_JCFG_H
+#define QPID_QLS_JRNL_JCFG_H
#define QLS_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes, should match size used on disk media */
#define QLS_AIO_ALIGN_BOUNDARY_BYTES QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
@@ -55,4 +55,4 @@
#define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
-#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */
+#endif /* ifndef QPID_QLS_JRNL_JCFG_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index f21deead5d..099c32b18d 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -38,10 +38,9 @@
#include <sstream>
#include <unistd.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
#define AIO_CMPL_TIMEOUT_SEC 5
#define AIO_CMPL_TIMEOUT_NSEC 0
@@ -132,9 +131,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
_recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
highest_rid = _recoveryManager.getHighestRecordId();
- _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
- _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
+ _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
@@ -430,4 +429,4 @@ std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::en
return false;
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
index bb91eac569..76d2a9877d 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
-#define QPID_LINEARSTORE_JRNL_JCNTL_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
+#define QPID_LINEARSTORE_JOURNAL_JCNTL_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jcntl;
-}}
+}}}
#include <cstddef>
#include <deque>
@@ -38,10 +37,9 @@ namespace qls_jrnl
#include "qpid/linearstore/jrnl/smutex.h"
#include "qpid/linearstore/jrnl/wmgr.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class EmptyFilePool;
class EmptyFilePoolManager;
@@ -570,6 +568,6 @@ namespace qls_jrnl
bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
};
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
index 977c75e37a..734493074f 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
@@ -32,10 +32,9 @@
#include <sys/stat.h>
#include <unistd.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/):
_dirname(dirname)/*,
@@ -476,4 +475,4 @@ operator<<(std::ostream& os, const jdir* jdirPtr)
return os;
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
index 01a08c57fc..c13a5f5af0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
@@ -19,25 +19,23 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_JDIR_H
-#define QPID_LINEARSTORE_JRNL_JDIR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
+#define QPID_LINEARSTORE_JOURNAL_JDIR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jdir;
-}}
+}}}
//#include "qpid/linearstore/jrnl/jinf.h"
#include <dirent.h>
#include <string>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class jdir
@@ -364,6 +362,6 @@ namespace qls_jrnl
static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name);
};
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
index c8fd2f55fe..a68966030b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
@@ -21,10 +21,9 @@
#include "qpid/linearstore/jrnl/jerrno.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
std::map<uint32_t, const char*> jerrno::_err_map;
std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr;
@@ -214,4 +213,4 @@ jerrno::err_msg(const uint32_t err_no) throw ()
return _err_map_itr->second;
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
index 4c7124ae22..5af0a7ada0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
@@ -19,24 +19,22 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
-#define QPID_LEGACYSTORE_JRNL_JERRNO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H
+#define QPID_LINEARSTORE_JOURNAL_JERRNO_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jerrno;
-}}
+}}}
#include <map>
#include <stdint.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class jerrno
@@ -144,6 +142,6 @@ namespace qls_jrnl
static bool __init();
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
index 38ef85ddb8..44e9142698 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
@@ -27,10 +27,9 @@
#define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jexception::jexception() throw ():
std::exception(),
@@ -168,4 +167,4 @@ operator<<(std::ostream& os, const jexception* jePtr)
return os;
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
index a3a34691df..712b357254 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
-#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
+#define QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jexception;
-}}
+}}}
#include <cerrno>
#include <cstdio>
@@ -70,10 +69,10 @@ class jexception;
::abort(); \
}
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
/**
* \class jexception
* \brief Generic journal exception class
@@ -121,6 +120,6 @@ namespace qls_jrnl
friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr);
}; // class jexception
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
index 87e0b0c04c..45f0197f8b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
@@ -24,84 +24,15 @@
#include <iomanip>
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jrec::jrec() {}
jrec::~jrec() {}
-void
-jrec::chk_hdr(const rec_hdr_t& hdr)
-{
- if (hdr._magic == 0)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "enq magic NULL: rid=0x" << hdr._rid;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
- if (hdr._version != QLS_JRNL_VERSION)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "version: rid=0x" << hdr._rid;
- oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION;
- oss << " read=0x" << std::setw(2) << (int)hdr._version;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
-//#if defined (JRNL_LITTLE_ENDIAN)
-// uint8_t endian_flag = RHM_LENDIAN_FLAG;
-//#else
-// uint8_t endian_flag = RHM_BENDIAN_FLAG;
-//#endif
-// if (hdr._eflag != endian_flag)
-// {
-// std::ostringstream oss;
-// oss << std::hex << std::setfill('0');
-// oss << "endian_flag: rid=" << hdr._rid;
-// oss << ": expected=0x" << std::setw(2) << (int)endian_flag;
-// oss << " read=0x" << std::setw(2) << (int)hdr._eflag;
-// throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-// }
-}
-
-void
-jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid)
-{
- if (hdr._rid != rid)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "rid mismatch: expected=0x" << rid;
- oss << " read=0x" << hdr._rid;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
-}
-
-void
-jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr)
-{
- if (tail._xmagic != ~hdr._magic)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "magic: rid=0x" << hdr._rid;
- oss << ": expected=0x" << ~hdr._magic;
- oss << " read=0x" << tail._xmagic;
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
- }
- if (tail._rid != hdr._rid)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "rid: rid=0x" << hdr._rid;
- oss << ": read=0x" << tail._rid;
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
- }
-}
-
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
index 619cd0bdec..ef481cc3ed 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
@@ -19,152 +19,105 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JREC_H
-#define QPID_LEGACYSTORE_JRNL_JREC_H
-
-namespace qpid
-{
-namespace qls_jrnl
-{
-class jrec;
-}}
+#ifndef QPID_LINEARSTORE_JOURNAL_JREC_H
+#define QPID_LINEARSTORE_JOURNAL_JREC_H
#include <cstddef>
#include <fstream>
#include "qpid/linearstore/jrnl/jcfg.h"
-#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
-#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+#include <stdint.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
+struct rec_hdr_t;
+struct rec_tail_t;
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class jrec
+* \brief Abstract class for all file jrecords, both data and log. This class establishes
+* the common data format and structure for these jrecords.
+*/
+class jrec
{
+public:
+ jrec();
+ virtual ~jrec();
/**
- * \class jrec
- * \brief Abstract class for all file jrecords, both data and log. This class establishes
- * the common data format and structure for these jrecords.
+ * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
+ * pointer wptr starting at position rec_offs_dblks in the encoded record to a
+ * maximum size of max_size_dblks.
+ *
+ * This call encodes the content of the data contianed in this instance of jrec into a
+ * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
+ * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
+ * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
+ * data block this instance represents at which to start encoding.
+ *
+ * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
+ * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
+ * thus any remaining space in the final data-block is ignored; the returned value is the
+ * number of data-blocks consumed from the page by the encode action. Provided the initial
+ * alignment requirements are met, records may be of arbitrary size and may span multiple
+ * data-blocks, disk-blocks and/or pages.
+ *
+ * Since the record size in data-blocks is known, the general usage pattern is to call
+ * encode() as many times as is needed to fully encode the data. Each call to encode()
+ * will encode as much of the record as it can to what remains of the current page cache,
+ * and will return the number of data-blocks actually encoded.
+ *
+ * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
+ * is an instance representing record r2. Being larger than the page size ps, r2 would span
+ * multiple pages as follows:
+ * <pre>
+ * |<---ps--->|
+ * +----------+----------+----------+----...
+ * | |r2a| r2b | r2c | |
+ * |<-r1-><----------r2----------> |
+ * +----------+----------+----------+----...
+ * page: p0 p1 p2
+ * </pre>
+ * Encoding record r2 will require multiple calls to encode; one for each page which
+ * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
+ * points where the page boundaries intersect with the record. Assuming a page size
+ * of ps, the page boundary pointers are represented by their names p0, p1... and the
+ * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
+ * should be as follows:
+ * <pre>
+ * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
+ * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps)
+ * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
+ * </pre>
+ *
+ * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
+ * take place.
+ * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
+ * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
+ * \returns Number of data-blocks encoded.
*/
- class jrec
- {
- public:
- jrec();
- virtual ~jrec();
-
- /**
- * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
- * pointer wptr starting at position rec_offs_dblks in the encoded record to a
- * maximum size of max_size_dblks.
- *
- * This call encodes the content of the data contianed in this instance of jrec into a
- * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
- * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
- * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
- * data block this instance represents at which to start encoding.
- *
- * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
- * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
- * thus any remaining space in the final data-block is ignored; the returned value is the
- * number of data-blocks consumed from the page by the encode action. Provided the initial
- * alignment requirements are met, records may be of arbitrary size and may span multiple
- * data-blocks, disk-blocks and/or pages.
- *
- * Since the record size in data-blocks is known, the general usage pattern is to call
- * encode() as many times as is needed to fully encode the data. Each call to encode()
- * will encode as much of the record as it can to what remains of the current page cache,
- * and will return the number of data-blocks actually encoded.
- *
- * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
- * is an instance representing record r2. Being larger than the page size ps, r2 would span
- * multiple pages as follows:
- * <pre>
- * |<---ps--->|
- * +----------+----------+----------+----...
- * | |r2a| r2b | r2c | |
- * |<-r1-><----------r2----------> |
- * +----------+----------+----------+----...
- * page: p0 p1 p2
- * </pre>
- * Encoding record r2 will require multiple calls to encode; one for each page which
- * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
- * points where the page boundaries intersect with the record. Assuming a page size
- * of ps, the page boundary pointers are represented by their names p0, p1... and the
- * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
- * should be as follows:
- * <pre>
- * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
- * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps)
- * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
- * </pre>
- *
- * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
- * take place.
- * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
- * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
- * \returns Number of data-blocks encoded.
- */
- virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks) = 0;
-
- /**
- * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
- * pointer rptr starting at position jrec_offs_dblks in the encoded record to a
- * maximum size of max_size_blks.
- *
- * This call decodes a record in the page buffer pointed to by the data-block-aligned
- * (defined by JRNL_DBLK_SIZE) parameter rptr into this instance of jrec. No more than
- * paramter max_size_dblks data-blocks may be read from the buffer. The parameter
- * jrec_offs_dblks is the offset in data-blocks within the encoded record at which to start
- * decoding.
- *
- * Decoding entails reading the record header, the data and the tail. The record is
- * data-block-aligned (defined by JRNL_DBLK_SIZE); the returned value is the number of
- * data-blocks read from the buffer by the decode action. As the record data size is only
- * known once the header is read, the number of calls required to complete reading the
- * record will depend on the vlaues within this instance which are set when the
- * header is decoded.
- *
- * A non-zero value for jrec_offs_dblks implies that this is not the first call to
- * decode and the record data will be appended at this offset.
- *
- * \param h Reference to instance of struct hdr, already read from page buffer and used
- * to determine record type
- * \param rptr Data-block-aligned pointer to position in page buffer where decoding is to
- * begin.
- * \param rec_offs_dblks Offset within record from which to start appending the decoded
- * record.
- * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
- * \returns Number of data-blocks read (consumed).
- */
- virtual uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks) = 0;
-
- virtual bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0;
+ virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
- virtual std::string& str(std::string& str) const = 0;
- virtual std::size_t data_size() const = 0;
- virtual std::size_t xid_size() const = 0;
- virtual std::size_t rec_size() const = 0;
- inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
- static inline uint32_t size_dblks(const std::size_t size)
- { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
- static inline uint32_t size_sblks(const std::size_t size)
- { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
- static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
- { return (size + blksize - 1)/blksize; }
- virtual uint64_t rid() const = 0;
+ virtual std::string& str(std::string& str) const = 0;
+ virtual std::size_t data_size() const = 0;
+ virtual std::size_t xid_size() const = 0;
+ virtual std::size_t rec_size() const = 0;
+ inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
+ static inline uint32_t size_dblks(const std::size_t size)
+ { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
+ static inline uint32_t size_sblks(const std::size_t size)
+ { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
+ static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
+ { return (size + blksize - 1)/blksize; }
+ virtual uint64_t rid() const = 0;
- protected:
- virtual void chk_hdr() const = 0;
- virtual void chk_hdr(uint64_t rid) const = 0;
- virtual void chk_tail() const = 0;
- static void chk_hdr(const rec_hdr_t& hdr);
- static void chk_rid(const rec_hdr_t& hdr, uint64_t rid);
- static void chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr);
- virtual void clean() = 0;
- }; // class jrec
+protected:
+ virtual void clean() = 0;
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JREC_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JREC_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
index 32b024a09d..b561c6ca2b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
@@ -29,10 +29,9 @@
#include "qpid/linearstore/jrnl/jerrno.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
pmgr::page_cb::page_cb(uint16_t index):
_index(index),
@@ -199,4 +198,4 @@ pmgr::page_state_str(page_state ps)
return "<page_state unknown>";
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
index 64143a9e6e..787828af01 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
@@ -19,16 +19,15 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
-#define QPID_LEGACYSTORE_JRNL_PMGR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H
+#define QPID_LINEARSTORE_JOURNAL_PMGR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class pmgr;
class jcntl;
-}}
+}}}
#include <deque>
#include "qpid/linearstore/jrnl/aio.h"
@@ -40,11 +39,11 @@ namespace qls_jrnl
#include "qpid/linearstore/jrnl/txn_map.h"
#include "qpid/linearstore/jrnl/txn_rec.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
-class JournalFile;
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+ class JournalFile;
/**
* \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -122,6 +121,6 @@ class JournalFile;
virtual void clean();
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/slock.h b/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
index bbfee37353..b118f71bde 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
-#define QPID_LEGACYSTORE_JRNL_SLOCK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H
+#define QPID_LINEARSTORE_JOURNAL_SLOCK_H
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/smutex.h"
#include <pthread.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
class slock
@@ -68,6 +67,6 @@ namespace qls_jrnl
inline bool locked() const { return _locked; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h b/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
index 087e19730b..91520ce1ae 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
@@ -19,16 +19,15 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
-#define QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H
+#define QPID_LINEARSTORE_JOURNAL_SMUTEX_H
#include "qpid/linearstore/jrnl/jexception.h"
#include <pthread.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
class smutex
@@ -47,6 +46,6 @@ namespace qls_jrnl
inline pthread_mutex_t* get() const { return &_m; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
index 49e9a61704..5bb04496a4 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
@@ -23,10 +23,9 @@
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
const std::string
time_ns::str(int precision) const
@@ -39,5 +38,4 @@ time_ns::str(int precision) const
return oss.str();
}
-
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
index 3319573001..a228d47475 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
-#define QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H
+#define QPID_LINEARSTORE_JOURNAL_TIME_NS_H
#include <cerrno>
#include <ctime>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
struct time_ns : public timespec
{
@@ -88,6 +87,6 @@ struct time_ns : public timespec
{ if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
index 9c32f4128b..2cc7966efb 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
@@ -27,10 +27,9 @@
#include "qpid/linearstore/jrnl/slock.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// return/error codes
int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
@@ -231,4 +230,4 @@ txn_map::xid_list(std::vector<std::string>& xv)
}
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
index 146997abe0..419e043529 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
-#define QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
+#define QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class txn_map;
-}}
+}}}
#include "qpid/linearstore/jrnl/smutex.h"
#include <map>
@@ -35,10 +34,9 @@ namespace qls_jrnl
#include <string>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \struct txn_data_struct
@@ -141,6 +139,6 @@ namespace qls_jrnl
const txn_data_list get_tdata_list_nolock(const std::string& xid);
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
index 3c78903520..20333be622 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
@@ -22,38 +22,20 @@
#include "qpid/linearstore/jrnl/txn_rec.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
txn_rec::txn_rec():
-// _txn_hdr(),
_xidp(0),
_buff(0)
-// _txn_tail()
{
- ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0);
- ::rec_tail_init(&_txn_tail, 0, 0, 0);
-}
-
-txn_rec::txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/):
-// _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
- _xidp(xidp),
- _buff(0)
-// _txn_tail(_txn_hdr)
-{
- ::txn_hdr_init(&_txn_hdr, magic, QLS_JRNL_VERSION, 0, rid, xidlen);
- ::rec_tail_copy(&_txn_tail, &_txn_hdr._rhdr, 0);
+ ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
}
txn_rec::~txn_rec()
@@ -62,28 +44,17 @@ txn_rec::~txn_rec()
}
void
-txn_rec::reset(const uint32_t magic)
+txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen)
{
- _txn_hdr._rhdr._magic = magic;
- _txn_hdr._rhdr._rid = 0;
- _txn_hdr._xidsize = 0;
- _xidp = 0;
- _buff = 0;
- _txn_tail._xmagic = ~magic;
- _txn_tail._rid = 0;
-}
-
-void
-txn_rec::reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/)
-{
- _txn_hdr._rhdr._magic = magic;
+ _txn_hdr._rhdr._magic = commitFlag ? QLS_TXC_MAGIC : QLS_TXA_MAGIC;
+ _txn_hdr._rhdr._serial = serial;
_txn_hdr._rhdr._rid = rid;
-// _txn_hdr.set_owi(owi);
_txn_hdr._xidsize = xidlen;
_xidp = xidp;
_buff = 0;
- _txn_tail._xmagic = ~magic;
+ _txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
+ _txn_tail._serial = serial;
_txn_tail._rid = rid;
}
@@ -195,132 +166,16 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
return size_dblks(wr_cnt);
}
-uint32_t
-txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-
- if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page
- if (rec_offs - sizeof(txn_hdr_t) < _txn_hdr._xidsize)
- {
- // Part of xid still outstanding, copy remainder of xid and tail
- const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
- const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt = xid_rem;
- std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail));
- chk_tail();
- rd_cnt += sizeof(_txn_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page, tail split
- const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
- const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_txn_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- rd_cnt += sizeof(uint32_t); // Filler 0
-#endif
- _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(txn_hdr_t);
- chk_hdr();
- _buff = std::malloc(_txn_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize +
- sizeof(rec_tail_t));
-
- // Check if record (header + xid + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_xid_tail_dblks <= max_size_dblks)
- {
- // Entire header, xid and tail fits within this page
- std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
- rd_cnt += _txn_hdr._xidsize;
- std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail));
- rd_cnt += sizeof(_txn_tail);
- chk_tail();
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Entire header and xid fit within this page, tail split
- std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
- rd_cnt += _txn_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate for xid
//_txn_hdr.hdr_copy(h);
::rec_hdr_copy(&_txn_hdr._rhdr, &h);
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
- ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
+ ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
rec_offs = sizeof(txn_hdr_t);
_buff = std::malloc(_txn_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
@@ -358,8 +213,22 @@ txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- chk_tail(); // Throws if tail invalid or record incomplete
+ if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
+ }
assert(!ifsp->fail() && !ifsp->bad());
+ int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
+ }
return true;
}
@@ -403,38 +272,9 @@ txn_rec::rec_size() const
}
void
-txn_rec::chk_hdr() const
-{
- jrec::chk_hdr(_txn_hdr._rhdr);
- if (_txn_hdr._rhdr._magic != QLS_TXA_MAGIC && _txn_hdr._rhdr._magic != QLS_TXC_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rhdr._rid;
- oss << ": expected=(0x" << std::setw(8) << QLS_TXA_MAGIC;
- oss << " or 0x" << QLS_TXC_MAGIC;
- oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr");
- }
-}
-
-void
-txn_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_txn_hdr._rhdr, rid);
-}
-
-void
-txn_rec::chk_tail() const
-{
- jrec::chk_tail(_txn_tail, _txn_hdr._rhdr);
-}
-
-void
txn_rec::clean()
{
// clean up allocated memory here
}
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
index 4e9700e392..f256891664 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
@@ -19,70 +19,49 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
-#define QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H
+#define QPID_LINEARSTORE_JOURNAL_TXN_REC_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class txn_rec;
-}}
-
-#include <cstddef>
#include "qpid/linearstore/jrnl/jrec.h"
#include "qpid/linearstore/jrnl/utils/txn_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
- /**
- * \class txn_rec
- * \brief Class to handle a single journal DTX commit or abort record.
- */
- class txn_rec : public jrec
- {
- private:
- txn_hdr_t _txn_hdr; ///< transaction header
- const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _txn_tail; ///< Record tail
+/**
+* \class txn_rec
+* \brief Class to handle a single journal commit or abort record.
+*/
+class txn_rec : public jrec
+{
+private:
+ ::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
- public:
- // constructor used for read operations and xid must have memory allocated
- txn_rec();
- // constructor used for write operations, where xid already exists
- txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/);
- virtual ~txn_rec();
+public:
+ txn_rec();
+ virtual ~txn_rec();
- // Prepare instance for use in reading data from journal
- void reset(const uint32_t magic);
- // Prepare instance for use in writing data to journal
- void reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+ void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
- std::size_t get_xid(void** const xidpp);
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return 0; } // This record never carries data
- std::size_t xid_size() const;
- std::size_t rec_size() const;
- inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
+ inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
- private:
- void chk_hdr() const;
- void chk_hdr(uint64_t rid) const;
- void chk_tail() const;
- virtual void clean();
- }; // class txn_rec
+private:
+ virtual void clean();
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
index 0cdab3580c..144ce4125a 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
@@ -23,9 +23,9 @@
/*static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;*/
-void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t
- rid, const uint64_t deq_rid, const uint64_t xidsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_deq_rid = deq_rid;
dest->_xidsize = xidsize;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
index 25cfc71efb..3392867153 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -42,12 +42,14 @@ extern "C"{
* The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
* previous enqueue record being dequeued by this record.
*
- * Record header info in binary format (32 bytes):
+ * Record header info in binary format (40 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | deq-rid |
@@ -67,7 +69,7 @@ typedef struct deq_hdr_t {
static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src);
bool is_txn_coml_commit(const deq_hdr_t *dh);
void set_txn_coml_commit(deq_hdr_t *dh, const bool commit);
@@ -78,4 +80,4 @@ void set_txn_coml_commit(deq_hdr_t *dh, const bool commit);
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
index b8d2da7722..b4e8b62ff1 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
@@ -25,8 +25,8 @@
//static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_xidsize = xidsize;
dest->_dsize = dsize;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
index 1beaef1db8..00108792bc 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -38,12 +38,14 @@ extern "C"{
*
* This header precedes all enqueue data in journal files.
*
- * Record header info in binary format (32 bytes):
+ * Record header info in binary format (40 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | xidsize |
@@ -64,7 +66,7 @@ static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src);
bool is_enq_transient(const enq_hdr_t *eh);
void set_enq_transient(enq_hdr_t *eh, const bool transient);
@@ -78,4 +80,4 @@ bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t versio
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
index dcc24bac8a..35b4ea219e 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
@@ -23,8 +23,8 @@
#include <string.h>
void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
- const uint16_t efp_partition, const uint64_t file_size) {
- rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+ const uint16_t efp_partition, const uint64_t file_size) {
+ rec_hdr_init(&dest->_rhdr, magic, version, 0, 0, 0);
dest->_fhdr_size_sblks = fhdr_size_sblks;
dest->_efp_partition = efp_partition;
dest->_reserved = 0;
@@ -36,10 +36,11 @@ void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t vers
dest->_queue_name_len = 0;
}
-int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
- const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid,
+ const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
file_hdr_t* fhp = (file_hdr_t*)dest;
fhp->_rhdr._uflag = uflag;
+ fhp->_rhdr._serial = serial;
fhp->_rhdr._rid = rid;
fhp->_fro = fro;
fhp->_file_number = file_number;
@@ -54,6 +55,13 @@ int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, con
return set_time_now(dest);
}
+int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib) {
+ int res = rec_hdr_check_base(&hdr->_rhdr, magic, version);
+ if (res != 0) return 0;
+ if (hdr->_data_size_kib != data_size_kib) return 3;
+ return 0;
+}
+
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
@@ -84,6 +92,23 @@ int is_file_hdr_reset(file_hdr_t* target) {
target->_queue_name_len == 0;
}
+/*
+uint64_t random_64() {
+ int randomData = open("/dev/random", O_RDONLY);
+ if (randomData < 0) {
+ return 0ULL;
+ }
+ uint64_t randomNumber;
+ size_t size = sizeof(randomNumber);
+ ssize_t result = read(randomData, (char*)&randomNumber, size);
+ if (result != size) {
+ randomNumber = 0ULL;
+ }
+ close(randomData);
+ return randomNumber;
+}
+*/
+
int set_time_now(file_hdr_t *fh)
{
struct timespec ts;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
index 6a53c631d5..53ca686fb8 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -41,17 +41,19 @@ extern "C"{
* block in the file. The record ID and offset are updated on each overwrite of the
* file.
*
- * File header info in binary format (66 bytes + size of file name in octets):
+ * File header info in binary format (74 bytes + size of file name in octets):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
- * | first rid in file | |
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | fs | partn | reserved |
* +---+---+---+---+---+---+---+---+
- * | file-size |
+ * | data-size |
* +---+---+---+---+---+---+---+---+
* | fro |
* +---+---+---+---+---+---+---+---+
@@ -87,10 +89,11 @@ typedef struct file_hdr_t {
uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */
} file_hdr_t;
-void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
- const uint16_t efp_partition, const uint64_t file_size);
-int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
- const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version,
+ const uint16_t fhdr_size_sblks, const uint16_t efp_partition, const uint64_t file_size);
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid,
+ const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib);
void file_hdr_reset(file_hdr_t* target);
int is_file_hdr_reset(file_hdr_t* target);
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src);
@@ -103,4 +106,4 @@ void set_time(file_hdr_t *fh, struct timespec *ts);
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
index be56068b9e..ad5262f9a3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
@@ -1,9 +1,10 @@
#include "rec_hdr.h"
-void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid) {
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid) {
dest->_magic = magic;
dest->_version = version;
dest->_uflag = uflag;
+ dest->_serial = serial;
dest->_rid = rid;
}
@@ -11,5 +12,19 @@ void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src) {
dest->_magic = src->_magic;
dest->_version = src->_version;
dest->_uflag = src->_uflag;
+ dest->_serial = src->_serial;
dest->_rid = src->_rid;
}
+
+int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version) {
+ if (header->_magic != magic) return 1;
+ if (header->_version != version) return 2;
+ return 0;
+}
+
+int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial) {
+ int res = rec_hdr_check_base(header, magic, version);
+ if (res != 0) return res;
+ if (header->_serial != serial) return 3;
+ return 0;
+}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
index c843e21316..64349b5ab8 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -34,11 +34,13 @@ extern "C"{
* This includes identification for the file type, the encoding version, endian
* indicator and a record ID.
*
- * File header info in binary format (16 bytes):
+ * File header info in binary format (24 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+
- * | magic | ver | flags |
+ * | magic | ver | uflag |
+ * +---+---+---+---+---+---+---+---+
+ * | serial |
* +---+---+---+---+---+---+---+---+
* | rid |
* +---+---+---+---+---+---+---+---+
@@ -52,11 +54,14 @@ typedef struct rec_hdr_t {
uint32_t _magic; /**< File type identifier (magic number) */
uint16_t _version; /**< File encoding version */
uint16_t _uflag; /**< User-defined flags */
+ uint64_t _serial; /**< Serial number for this journal file */
uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
} rec_hdr_t;
-void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid);
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid);
void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src);
+int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version);
+int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial);
#pragma pack()
@@ -64,4 +69,4 @@ void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src);
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
index 2e604eb6c6..88c68e2b78 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
@@ -21,14 +21,25 @@
#include "rec_tail.h"
-void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid) {
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial,
+ const uint64_t rid) {
dest->_xmagic = xmagic;
dest->_checksum = checksum;
+ dest->_serial = serial;
dest->_rid = rid;
}
void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum) {
dest->_xmagic = ~(src->_magic);
dest->_checksum = checksum;
+ dest->_serial = src->_serial;
dest->_rid = src->_rid;
}
+
+int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) {
+ if (tail->_xmagic != ~header->_magic) return 1;
+ if (tail->_serial != header->_serial) return 2;
+ if (tail->_rid != header->_rid) return 3;
+ if (tail->_checksum != checksum) return 4;
+ return 0;
+}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
index 9fbd186ea0..5163580ead 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
-#define QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -41,26 +41,32 @@ extern "C"{
* The checksum is used to verify the xid and/or data portion of the record
* on recovery, and excludes the header and tail.
*
- * Record header info in binary format (16 bytes):
+ * Record header info in binary format (24 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+
* | ~(magic) | checksum |
* +---+---+---+---+---+---+---+---+
+ * | serial |
+ * +---+---+---+---+---+---+---+---+
* | rid |
* +---+---+---+---+---+---+---+---+
*
- * rid = Record ID
+ * ~(magic) = 1's compliment of magic of matching record header
+ * rid = Record ID of matching record header
* </pre>
*/
typedef struct rec_tail_t {
uint32_t _xmagic; /**< Binary inverse (1's complement) of hdr magic number */
- uint32_t _checksum; /**< Checksum of xid and data */
- uint64_t _rid; /**< ID (rotating 64-bit counter) */
+ uint32_t _checksum; /**< Checksum of xid and data (excluding header itself) */
+ uint64_t _serial; /**< Serial number for this journal file */
+ uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
} rec_tail_t;
-void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid);
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial,
+ const uint64_t rid);
void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum);
+int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum);
#pragma pack()
@@ -68,4 +74,4 @@ void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checks
}
#endif
-#endif /* ifnedf QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H */
+#endif /* ifnedf QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
index 3b94bf3bb8..58d4cdebe4 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
@@ -22,8 +22,8 @@
#include "txn_hdr.h"
void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_xidsize = xidsize;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
index 02c00e09b3..442a1d373d 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -40,12 +40,14 @@ extern "C"{
* Note that this record had its own rid distinct from the rids of the record(s) making up the
* transaction it is committing or aborting.
*
- * Record header info in binary format (24 bytes):
+ * Record header info in binary format (32 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
- * | magic | v | e | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | magic | ver | flags | |
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | xidsize |
@@ -58,7 +60,7 @@ typedef struct txn_hdr_t {
} txn_hdr_t;
void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize);
void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src);
#pragma pack()
@@ -67,4 +69,4 @@ void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src);
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index 991e26a3b9..9f91132f80 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -35,10 +35,9 @@
//#include <iostream> // DEBUG
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
wmgr::wmgr(jcntl* jc,
enq_map& emap,
@@ -150,7 +149,7 @@ wmgr::enqueue(const void* const data_buff,
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
+ _enq_rec.reset(_lfc.getCurrentSerial(), rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
if (!cont)
{
dtokp->set_rid(rid);
@@ -265,7 +264,7 @@ wmgr::dequeue(data_tok* dtokp,
const bool ext_rid = dtokp->external_rid();
uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId();
uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
+ _deq_rec.reset(_lfc.getCurrentSerial(), rid, dequeue_rid, xid_ptr, xid_len, txn_coml_commit);
if (!cont)
{
if (!ext_rid)
@@ -391,7 +390,7 @@ wmgr::abort(data_tok* dtokp,
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ _txn_rec.reset(false, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len);
if (!cont)
{
dtokp->set_rid(rid);
@@ -489,7 +488,7 @@ wmgr::commit(data_tok* dtokp,
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ _txn_rec.reset(true, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len);
if (!cont)
{
dtokp->set_rid(rid);
@@ -1033,4 +1032,4 @@ wmgr::status_str() const
const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"};
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
index 6b455febd9..0278317ae9 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
-#define QPID_LEGACYSTORE_JRNL_WMGR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
+#define QPID_LINEARSTORE_JOURNAL_WMGR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class wmgr;
-}}
+}}}
#include <cstring>
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
@@ -37,123 +36,123 @@ class wmgr;
class file_hdr_t;
-namespace qpid
-{
-namespace qls_jrnl
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+class LinearFileController;
+
+/**
+* \brief Class for managing a write page cache of arbitrary size and number of pages.
+*
+* The write page cache works on the principle of caching the write data within a page until
+* that page is either full or flushed; this initiates a single AIO write operation to store
+* the data on disk.
+*
+* The maximum disk throughput is achieved by keeping the write operations of uniform size.
+* Waiting for a page cache to fill achieves this; and in high data volume/throughput situations
+* achieves the optimal disk throughput. Calling flush() forces a write of the current page cache
+* no matter how full it is, and disrupts the uniformity of the write operations. This should
+* normally only be done if throughput drops and there is a danger of a page of unwritten data
+* waiting around for excessive time.
+*
+* The usual tradeoff between data storage latency and throughput performance applies.
+*/
+class wmgr : public pmgr
{
- class LinearFileController;
-
- /**
- * \brief Class for managing a write page cache of arbitrary size and number of pages.
- *
- * The write page cache works on the principle of caching the write data within a page until
- * that page is either full or flushed; this initiates a single AIO write operation to store
- * the data on disk.
- *
- * The maximum disk throughput is achieved by keeping the write operations of uniform size.
- * Waiting for a page cache to fill achieves this; and in high data volume/throughput situations
- * achieves the optimal disk throughput. Calling flush() forces a write of the current page cache
- * no matter how full it is, and disrupts the uniformity of the write operations. This should
- * normally only be done if throughput drops and there is a danger of a page of unwritten data
- * waiting around for excessive time.
- *
- * The usual tradeoff between data storage latency and throughput performance applies.
- */
- class wmgr : public pmgr
- {
- private:
- LinearFileController& _lfc; ///< Linear File Controller ref
- uint32_t _max_dtokpp; ///< Max data writes per page
- uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
- uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
- std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
-
- // TODO: Convert _enq_busy etc into a proper threadsafe lock
- // TODO: Convert to enum? Are these encodes mutually exclusive?
- bool _enq_busy; ///< Flag true if enqueue is in progress
- bool _deq_busy; ///< Flag true if dequeue is in progress
- bool _abort_busy; ///< Flag true if abort is in progress
- bool _commit_busy; ///< Flag true if commit is in progress
-
- enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT };
- static const char* _op_str[];
-
- enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
- deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
- txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
- std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
-
- public:
- wmgr(jcntl* jc,
- enq_map& emap,
- txn_map& tmap,
- LinearFileController& lfc);
- wmgr(jcntl* jc,
- enq_map& emap,
- txn_map& tmap,
- LinearFileController& lfc,
- const uint32_t max_dtokpp,
- const uint32_t max_iowait_us);
- virtual ~wmgr();
-
- void initialize(aio_callback* const cbp,
- const uint32_t wcache_pgsize_sblks,
- const uint16_t wcache_num_pages,
- const uint32_t max_dtokpp,
- const uint32_t max_iowait_us,
- std::size_t eo = 0);
- iores enqueue(const void* const data_buff,
- const std::size_t tot_data_len,
- const std::size_t this_data_len,
- data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len,
- const bool transient,
- const bool external);
- iores dequeue(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len,
- const bool txn_coml_commit);
- iores abort(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len);
- iores commit(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len);
- iores flush();
- int32_t get_events(timespec* const timeout,
- bool flush);
- bool is_txn_synced(const std::string& xid);
- inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
- inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
-
- // Debug aid
- const std::string status_str() const;
-
- private:
- void initialize(aio_callback* const cbp,
- const uint32_t wcache_pgsize_sblks,
- const uint16_t wcache_num_pages);
- iores pre_write_check(const _op_type op,
- const data_tok* const dtokp,
- const std::size_t xidsize = 0,
- const std::size_t dsize = 0,
- const bool external = false) const;
- void dequeue_check(const std::string& xid,
- const uint64_t drid);
- void file_header_check(const uint64_t rid,
- const bool cont,
- const uint32_t rec_dblks_rem);
- void flush_check(iores& res,
- bool& cont,
- bool& done, const uint64_t rid);
- iores write_flush();
- void get_next_file();
- void dblk_roundup();
- void rotate_page();
- void clean();
- };
-
-}}
-
-#endif // ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
+private:
+ LinearFileController& _lfc; ///< Linear File Controller ref
+ uint32_t _max_dtokpp; ///< Max data writes per page
+ uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
+ uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
+ std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
+
+ // TODO: Convert _enq_busy etc into a proper threadsafe lock
+ // TODO: Convert to enum? Are these encodes mutually exclusive?
+ bool _enq_busy; ///< Flag true if enqueue is in progress
+ bool _deq_busy; ///< Flag true if dequeue is in progress
+ bool _abort_busy; ///< Flag true if abort is in progress
+ bool _commit_busy; ///< Flag true if commit is in progress
+
+ enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT };
+ static const char* _op_str[];
+
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+ std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+
+public:
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc);
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us);
+ virtual ~wmgr();
+
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us,
+ std::size_t eo = 0);
+ iores enqueue(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool transient,
+ const bool external);
+ iores dequeue(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool txn_coml_commit);
+ iores abort(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores commit(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores flush();
+ int32_t get_events(timespec* const timeout,
+ bool flush);
+ bool is_txn_synced(const std::string& xid);
+ inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
+ inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
+
+ // Debug aid
+ const std::string status_str() const;
+
+private:
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages);
+ iores pre_write_check(const _op_type op,
+ const data_tok* const dtokp,
+ const std::size_t xidsize = 0,
+ const std::size_t dsize = 0,
+ const bool external = false) const;
+ void dequeue_check(const std::string& xid,
+ const uint64_t drid);
+ void file_header_check(const uint64_t rid,
+ const bool cont,
+ const uint32_t rec_dblks_rem);
+ void flush_check(iores& res,
+ bool& cont,
+ bool& done, const uint64_t rid);
+ iores write_flush();
+ void get_next_file();
+ void dblk_roundup();
+ void rotate_page();
+ void clean();
+};
+
+}}}
+
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H