summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-11-18 17:44:32 +0000
committerKim van der Riet <kpvdr@apache.org>2013-11-18 17:44:32 +0000
commit0daaf7f017473712dc5fd0cb96a59b8c5b3f36b2 (patch)
treed36d71488550abbc93b11e55af6dae2040478285 /qpid/cpp/src
parent7995a381a48d623d245fc9fcd9a1b6996999d27a (diff)
downloadqpid-python-0daaf7f017473712dc5fd0cb96a59b8c5b3f36b2.tar.gz
QPID-4984: Cleanup of #includes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1543093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp65
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h22
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalLogImpl.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp23
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h37
-rw-r--r--qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/PreparedTransaction.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/StoreException.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/StorePlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp5
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.h22
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h7
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h11
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h5
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/aio.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/cvar.h75
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/data_tok.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_map.h133
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enums.h46
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp16
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.h1040
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jdir.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jdir.h7
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jexception.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jrec.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/pmgr.h144
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/slock.h67
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.h8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp12
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.h10
48 files changed, 810 insertions, 1018 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index 4092bc4ab9..081fab190d 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -21,25 +21,14 @@
#include "qpid/linearstore/JournalImpl.h"
+#include "qpid/linearstore/DataTokenImpl.h"
#include "qpid/linearstore/JournalLogImpl.h"
-#include "qpid/linearstore/journal/jerrno.h"
#include "qpid/linearstore/journal/jexception.h"
-#include "qpid/linearstore/journal/EmptyFilePool.h"
#include "qpid/linearstore/StoreException.h"
-#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Timer.h"
-#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
-#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
-#include "qmf/org/apache/qpid/linearstore/EventFull.h"
-#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
-
-using namespace qpid::linearstore::journal;
-using namespace qpid::linearstore;
-using qpid::management::ManagementAgent;
-namespace _qmf = qmf::org::apache::qpid::linearstore;
+namespace qpid {
+namespace linearstore {
InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
@@ -87,7 +76,7 @@ JournalImpl::~JournalImpl()
if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
- catch (const jexception& e) { QLS_LOG2(error, _jid, e.what()); }
+ catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
@@ -106,8 +95,8 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
_agent = a;
if (_agent != 0)
{
- _mgmtObject = _qmf::Journal::shared_ptr (
- new _qmf::Journal(_agent, this));
+ _mgmtObject = ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr (
+ new ::qmf::org::apache::qpid::linearstore::Journal(_agent, this));
_mgmtObject->set_name(_jid);
_mgmtObject->set_directory(_jdir.dirname());
@@ -163,11 +152,7 @@ JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
}
void
-JournalImpl::recover(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
- boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::linearstore::journal::aio_callback* const cbp,
@@ -211,8 +196,8 @@ JournalImpl::recover(/*const uint16_t num_jfiles,
if (prep_tx_list_ptr)
{
for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
if (tdl_itr->enq_flag_) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->rid_);
} else { // dequeue op
@@ -253,7 +238,7 @@ JournalImpl::recover_complete()
void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient)
+ const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient)
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
@@ -265,7 +250,7 @@ JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_d
}
void
-JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
const bool transient)
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
@@ -279,7 +264,7 @@ JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dto
void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
+ const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
@@ -296,7 +281,7 @@ JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t t
}
void
-JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
const std::string& xid, const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
@@ -314,7 +299,7 @@ JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok*
}
void
-JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit)
{
handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
@@ -327,7 +312,7 @@ JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_comm
}
void
-JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
@@ -344,7 +329,7 @@ JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& x
}
void
-JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
@@ -356,7 +341,7 @@ JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
}
void
-JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
@@ -381,10 +366,10 @@ JournalImpl::stop(bool block_till_aio_cmpl)
}
}
-iores
+qpid::linearstore::journal::iores
JournalImpl::flush(const bool block_till_aio_cmpl)
{
- const iores res = jcntl::flush(block_till_aio_cmpl);
+ const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
{
qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
@@ -420,20 +405,20 @@ JournalImpl::flushFire()
}
void
-JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
+JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl)
{
- for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+ for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
{
DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
{
switch (dtokp->wstate())
{
- case data_tok::ENQ:
+ case qpid::linearstore::journal::data_tok::ENQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
dtokp->getSourceMessage()->enqueueComplete();
break;
- case data_tok::DEQ:
+ case qpid::linearstore::journal::data_tok::DEQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
@@ -458,7 +443,7 @@ JournalImpl::createStore() {
}
void
-JournalImpl::handleIoResult(const iores r)
+JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r)
{
writeActivityFlag = true;
switch (r)
@@ -496,3 +481,5 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /
return status;
}
+
+}}
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index ec1fb99434..763089f3d0 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -22,33 +22,27 @@
#ifndef QPID_LINEARSTORE_JOURNALIMPL_H
#define QPID_LINEARSTORE_JOURNALIMPL_H
-#include <set>
-#include "qpid/linearstore/journal/enums.h"
-#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
+#include <boost/ptr_container/ptr_list.hpp>
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/linearstore/journal/aio_callback.h"
#include "qpid/linearstore/journal/jcntl.h"
-#include "qpid/linearstore/DataTokenImpl.h"
#include "qpid/linearstore/PreparedTransaction.h"
-#include "qpid/broker/PersistableQueue.h"
#include "qpid/sys/Timer.h"
-#include "qpid/sys/Time.h"
-#include <boost/ptr_container/ptr_list.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include "qpid/management/Manageable.h"
+
#include "qmf/org/apache/qpid/linearstore/Journal.h"
namespace qpid{
namespace sys {
-class Timer;
+//class Timer;
}
namespace linearstore{
-
-class JournalImpl;
-class JournalLogImpl;
namespace journal {
- class EmptyFilePool;
+// class EmptyFilePool;
}
+class JournalImpl;
+class JournalLogImpl;
class InactivityFireEvent : public qpid::sys::TimerTask
{
diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
index 85e977595d..36e7c7e410 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
@@ -21,6 +21,8 @@
#include "qpid/linearstore/JournalLogImpl.h"
+#include "qpid/log/Statement.h"
+
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
index e674c6dcc8..846eaac124 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
@@ -23,7 +23,6 @@
#define QPID_LINEARSTORE_LOG_H
#include "qpid/linearstore/journal/JournalLog.h"
-#include "qpid/log/Statement.h"
#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg)
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index dcb53ff55d..593d2bd4b7 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -21,28 +21,29 @@
#include "qpid/linearstore/MessageStoreImpl.h"
-#include "db-inc.h"
-#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/linearstore/BindingDbt.h"
#include "qpid/linearstore/BufferValue.h"
+#include "qpid/linearstore/Cursor.h"
+#include "qpid/linearstore/DataTokenImpl.h"
#include "qpid/linearstore/IdDbt.h"
+#include "qpid/linearstore/JournalImpl.h"
#include "qpid/linearstore/journal/EmptyFilePoolManager.h"
-#include "qpid/linearstore/journal/txn_map.h"
-#include "qpid/framing/FieldValue.h"
-#include "qmf/org/apache/qpid/linearstore/Package.h"
#include "qpid/linearstore/StoreException.h"
-#include <dirent.h>
+#include "qpid/linearstore/TxnCtxt.h"
+#include "qpid/log/Statement.h"
+#include "qmf/org/apache/qpid/linearstore/Package.h"
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
-namespace _qmf = qmf::org::apache::qpid::linearstore;
+//namespace _qmf = qmf::org::apache::qpid::linearstore;
namespace qpid{
namespace linearstore{
-
const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name
// FIXME aconway 2010-03-09: was 10
@@ -147,9 +148,9 @@ void MessageStoreImpl::initManagement ()
if (broker != 0) {
agent = broker->getManagementAgent();
if (agent != 0) {
- _qmf::Package packageInitializer(agent);
- mgmtObject = _qmf::Store::shared_ptr (
- new _qmf::Store(agent, this, broker));
+ qmf::org::apache::qpid::linearstore::Package packageInitializer(agent);
+ mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr (
+ new qmf::org::apache::qpid::linearstore::Store(agent, this, broker));
mgmtObject->set_location(storeDir);
mgmtObject->set_tplIsInitialized(false);
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 6992c17bef..e995237d99 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -23,22 +23,14 @@
#define QPID_LINEARSTORE_MESSAGESTOREIMPL_H
#include <iomanip>
-#include <string>
-
-#include "db-inc.h"
-#include "qpid/linearstore/Cursor.h"
-#include "qpid/linearstore/IdDbt.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/linearstore/IdSequence.h"
-#include "qpid/linearstore/JournalImpl.h"
#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/journal/jcfg.h"
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/PreparedTransaction.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/management/Manageable.h"
+
#include "qmf/org/apache/qpid/linearstore/Store.h"
-#include "qpid/linearstore/TxnCtxt.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -46,15 +38,28 @@
#define DB_BUFFER_SMALL ENOMEM
#endif
-namespace qpid { namespace sys {
-class Timer;
-}}
+class Db;
+class DbEnv;
+class Dbt;
+class DbTxn;
-namespace qpid{
-namespace qls_jrnl {
-class EmptyFilePoolManager;
+namespace qpid {
+namespace broker {
+ class Broker;
+}
+namespace sys {
+ class Timer;
}
namespace linearstore{
+namespace journal {
+ class EmptyFilePool;
+ class EmptyFilePoolManager;
+}
+
+class IdDbt;
+class JournalImpl;
+class TplJournalImpl;
+class TxnCtxt;
/**
* An implementation of the MessageStore interface based on Berkeley DB
diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
index a353a339e3..1b92ca8c23 100644
--- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
+++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
@@ -20,10 +20,9 @@
*/
#include "qpid/linearstore/PreparedTransaction.h"
-#include <algorithm>
-using namespace qpid::linearstore;
-using std::string;
+namespace qpid {
+namespace linearstore {
void LockedMappings::add(queue_id queue, message_id message)
{
@@ -79,3 +78,4 @@ PreparedTransaction::PreparedTransaction(const std::string& _xid,
: xid(_xid), enqueues(_enqueues), dequeues(_dequeues) {}
+}}
diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
index 9e401c2a30..7b381ba3b9 100644
--- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
+++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
@@ -22,13 +22,11 @@
#ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H
#define QPID_LINEARSTORE_PREPAREDTRANSACTION_H
+#include <boost/ptr_container/ptr_list.hpp>
+#include <boost/shared_ptr.hpp>
#include <list>
#include <map>
-#include <set>
#include <stdint.h>
-#include <string>
-#include <boost/shared_ptr.hpp>
-#include <boost/ptr_container/ptr_list.hpp>
namespace qpid{
namespace linearstore{
diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h
index 5d577d5779..7a598a524f 100644
--- a/qpid/cpp/src/qpid/linearstore/StoreException.h
+++ b/qpid/cpp/src/qpid/linearstore/StoreException.h
@@ -22,8 +22,8 @@
#ifndef QPID_LINEARSTORE_STOREEXCEPTION_H
#define QPID_LINEARSTORE_STOREEXCEPTION_H
-#include "qpid/linearstore/IdDbt.h"
#include <boost/format.hpp>
+#include "db-inc.h"
namespace qpid{
namespace linearstore{
diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
index 9826240068..6dfb2056bf 100644
--- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -25,6 +25,7 @@
#include "qpid/DataDir.h"
#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/MessageStoreImpl.h"
+#include "qpid/log/Statement.h"
using qpid::linearstore::MessageStoreImpl;
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 213d5865e1..50ef58aafa 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -21,9 +21,8 @@
#include "qpid/linearstore/TxnCtxt.h"
-#include <sstream>
-
-#include "qpid/linearstore/journal/jexception.h"
+#include "qpid/linearstore/DataTokenImpl.h"
+#include "qpid/linearstore/JournalImpl.h"
#include "qpid/linearstore/StoreException.h"
namespace qpid{
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
index 5a4350b128..4f95da5950 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
@@ -22,23 +22,21 @@
#ifndef QPID_LINEARSTORE_TXNCTXT_H
#define QPID_LINEARSTORE_TXNCTXT_H
-#include "db-inc.h"
-#include <memory>
-#include <set>
-#include <string>
-
-#include "qpid/linearstore/DataTokenImpl.h"
-#include "qpid/linearstore/IdSequence.h"
-#include "qpid/linearstore/JournalImpl.h"
-#include "qpid/broker/PersistableQueue.h"
+#include <boost/intrusive_ptr.hpp>
#include "qpid/broker/TransactionalStore.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/linearstore/IdSequence.h"
#include "qpid/sys/uuid.h"
-#include <boost/intrusive_ptr.hpp>
+class DbEnv;
+class DbTxn;
-namespace qpid{
+namespace qpid {
+namespace broker {
+ class ExternalQueueStore;
+}
namespace linearstore{
+ class DataTokenImpl;
+ class JournalImpl;
class TxnCtxt : public qpid::broker::TransactionContext
{
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
index 29f8b0b988..83db81dce8 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
@@ -21,12 +21,10 @@
#include "EmptyFilePool.h"
-#include <cctype>
#include <fstream>
#include "qpid/linearstore/journal/EmptyFilePoolPartition.h"
#include "qpid/linearstore/journal/jcfg.h"
#include "qpid/linearstore/journal/jdir.h"
-#include "qpid/linearstore/journal/JournalFile.h"
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
index ccaeb02567..5751c9d180 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
@@ -31,7 +31,6 @@ namespace journal {
#include <deque>
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/journal/smutex.h"
-#include <string>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
index 2aadb90a8a..ab70e3825e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
@@ -21,12 +21,11 @@
#include "EmptyFilePoolManager.h"
-#include <dirent.h>
+#include "qpid/linearstore/journal/EmptyFilePool.h"
#include "qpid/linearstore/journal/EmptyFilePoolPartition.h"
#include "qpid/linearstore/journal/jdir.h"
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
-#include <vector>
//#include <iostream> // DEBUG
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
index beb884b91d..fa667f3326 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
@@ -23,13 +23,18 @@
#define QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_
#include <map>
-#include "qpid/linearstore/journal/EmptyFilePoolPartition.h"
+#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/journal/smutex.h"
+#include <vector>
namespace qpid {
namespace linearstore {
namespace journal {
+class EmptyFilePool;
+class EmptyFilePoolPartition;
+class JournalLog;
+
class EmptyFilePoolManager
{
protected:
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
index bcebd571c2..bef8d4e788 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
@@ -21,11 +21,9 @@
#include "qpid/linearstore/journal/EmptyFilePoolPartition.h"
-#include <dirent.h>
#include <iomanip>
+#include "qpid/linearstore/journal/EmptyFilePool.h"
#include "qpid/linearstore/journal/jdir.h"
-#include "qpid/linearstore/journal/jerrno.h"
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/slock.h"
//#include <iostream> // DEBUG
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
index 3afee79816..bd338461cb 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
@@ -22,22 +22,17 @@
#ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_
#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_
-namespace qpid {
-namespace linearstore {
-namespace journal {
- class EmptyFilePoolPartition;
-}}}
-
-#include "qpid/linearstore/journal/EmptyFilePool.h"
+#include <map>
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/journal/smutex.h"
#include <string>
-#include <map>
#include <vector>
namespace qpid {
namespace linearstore {
namespace journal {
+
+class EmptyFilePool;
class JournalLog;
class EmptyFilePoolPartition
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
index d8e8225697..14213f7955 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
@@ -24,7 +24,6 @@
#include <iostream>
#include <stdint.h>
-#include <utility> // std::pair
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
index 15cbe33ad8..8f6a311a84 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
@@ -23,7 +23,6 @@
#include <fcntl.h>
#include "qpid/linearstore/journal/jcfg.h"
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/pmgr.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <unistd.h>
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
index ef97d10e3a..f0ad432fd8 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
@@ -25,8 +25,6 @@
#include "qpid/linearstore/journal/aio.h"
#include "qpid/linearstore/journal/AtomicCounter.h"
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
-#include <stdint.h>
-#include <string>
class file_hdr_t;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp
index 9a2438ae0b..c35ec97e91 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/linearstore/journal/JournalLog.h"
+
#include <iostream>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 99883d2722..99b7f0f381 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -21,13 +21,9 @@
#include "qpid/linearstore/journal/LinearFileController.h"
-#include <fstream>
#include "qpid/linearstore/journal/EmptyFilePool.h"
-#include "qpid/linearstore/journal/jcfg.h"
#include "qpid/linearstore/journal/jcntl.h"
#include "qpid/linearstore/journal/JournalFile.h"
-#include "qpid/linearstore/journal/slock.h"
-#include "qpid/linearstore/journal/utils/file_hdr.h"
//#include <iostream> // DEBUG
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
index 436c07f289..61226d3015 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
@@ -23,13 +23,10 @@
#define QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_
#include <deque>
+#include "qpid/linearstore/journal/aio.h"
#include "qpid/linearstore/journal/AtomicCounter.h"
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
-// libaio forward declares
-typedef struct io_context* io_context_t;
-typedef struct iocb aio_cb;
-
namespace qpid {
namespace linearstore {
namespace journal {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 8ae82c7b80..22241aa164 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -26,6 +26,7 @@
#include <iomanip>
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/deq_rec.h"
+#include "qpid/linearstore/journal/EmptyFilePool.h"
#include "qpid/linearstore/journal/EmptyFilePoolManager.h"
#include "qpid/linearstore/journal/enq_map.h"
#include "qpid/linearstore/journal/enq_rec.h"
diff --git a/qpid/cpp/src/qpid/linearstore/journal/aio.h b/qpid/cpp/src/qpid/linearstore/journal/aio.h
index 3a4a762439..54e3401748 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/aio.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/aio.h
@@ -24,7 +24,6 @@
#include <libaio.h>
#include <cstring>
-#include <string.h>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/cvar.h b/qpid/cpp/src/qpid/linearstore/journal/cvar.h
deleted file mode 100644
index 89ba8cbb13..0000000000
--- a/qpid/cpp/src/qpid/linearstore/journal/cvar.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
-#define QPID_LINEARSTORE_JOURNAL_CVAR_H
-
-#include <cstring>
-#include "qpid/linearstore/journal/jerrno.h"
-#include "qpid/linearstore/journal/jexception.h"
-#include "qpid/linearstore/journal/smutex.h"
-#include "qpid/linearstore/journal/time_ns.h"
-#include <pthread.h>
-#include <sstream>
-
-namespace qpid {
-namespace linearstore {
-namespace journal {
-
- // Ultra-simple thread condition variable class
- class cvar
- {
- private:
- const smutex& _sm;
- pthread_cond_t _c;
- public:
- inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
- inline ~cvar() { ::pthread_cond_destroy(&_c); }
- inline void wait()
- {
- PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait");
- }
- inline void timedwait(timespec& ts)
- {
- PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait");
- }
- inline bool waitintvl(const long intvl_ns)
- {
- time_ns t; t.now(); t+=intvl_ns;
- int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
- if (ret == ETIMEDOUT)
- return true;
- PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl");
- return false;
- }
- inline void signal()
- {
- PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify");
- }
- inline void broadcast()
- {
- PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast");
- }
- };
-
-}}}
-
-#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
diff --git a/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp b/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp
index 830720f9d7..3952c403a1 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp
@@ -22,10 +22,7 @@
#include "qpid/linearstore/journal/data_tok.h"
#include <iomanip>
-#include "qpid/linearstore/journal/jerrno.h"
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/slock.h"
-#include <sstream>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/data_tok.h b/qpid/cpp/src/qpid/linearstore/journal/data_tok.h
index ff854730d2..67e0ec9683 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/data_tok.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/data_tok.h
@@ -29,10 +29,7 @@ class data_tok;
}}}
#include <cassert>
-#include <cstddef>
#include "qpid/linearstore/journal/smutex.h"
-#include <pthread.h>
-#include <string>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
index 0a9fee20f2..0da8d439af 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
@@ -23,7 +23,6 @@
#include <cassert>
#include <cstring>
-#include <iomanip>
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp
index 2ccd271865..4eaaa64992 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp
@@ -21,11 +21,7 @@
#include "qpid/linearstore/journal/enq_map.h"
-#include <iomanip>
-#include "qpid/linearstore/journal/jerrno.h"
#include "qpid/linearstore/journal/slock.h"
-#include <sstream>
-
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_map.h b/qpid/cpp/src/qpid/linearstore/journal/enq_map.h
index 97bad07ec6..912a583ab9 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_map.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_map.h
@@ -22,88 +22,79 @@
#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
#define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
-class enq_map;
-}}}
-
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/smutex.h"
-#include <map>
-#include <pthread.h>
#include <vector>
namespace qpid {
namespace linearstore {
namespace journal {
- /**
- * \class enq_map
- * \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued
- * data block using the record id (rid) as a key. This is the primary mechanism for
- * deterimining the enqueue low water mark: if a pfid exists in this map, then there is
- * at least one still-enqueued record in that file. (The transaction map must also be
- * clear, however.)
- *
- * Map rids against pfid and lock status. As records are enqueued, they are added to this
- * map, and as they are dequeued, they are removed. An enqueue is locked when a transactional
- * dequeue is pending that has been neither committed nor aborted.
- * <pre>
- * key data
- *
- * rid1 --- [ pfid, txn_lock ]
- * rid2 --- [ pfid, txn_lock ]
- * rid3 --- [ pfid, txn_lock ]
- * ...
- * </pre>
- */
- class enq_map
- {
- public:
- // return/error codes
- static short EMAP_DUP_RID;
- static short EMAP_LOCKED;
- static short EMAP_RID_NOT_FOUND;
- static short EMAP_OK;
- static short EMAP_FALSE;
- static short EMAP_TRUE;
+/**
+* \class enq_map
+* \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued
+* data block using the record id (rid) as a key. This is the primary mechanism for
+* deterimining the enqueue low water mark: if a pfid exists in this map, then there is
+* at least one still-enqueued record in that file. (The transaction map must also be
+* clear, however.)
+*
+* Map rids against pfid and lock status. As records are enqueued, they are added to this
+* map, and as they are dequeued, they are removed. An enqueue is locked when a transactional
+* dequeue is pending that has been neither committed nor aborted.
+* <pre>
+* key data
+*
+* rid1 --- [ pfid, txn_lock ]
+* rid2 --- [ pfid, txn_lock ]
+* rid3 --- [ pfid, txn_lock ]
+* ...
+* </pre>
+*/
+class enq_map
+{
+public:
+ // return/error codes
+ static short EMAP_DUP_RID;
+ static short EMAP_LOCKED;
+ static short EMAP_RID_NOT_FOUND;
+ static short EMAP_OK;
+ static short EMAP_FALSE;
+ static short EMAP_TRUE;
- typedef struct emap_data_struct_t {
- uint64_t _pfid;
- std::streampos _file_posn;
- bool _lock;
- emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
- emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
- } emqp_data_struct_t;
- typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
- typedef std::map<uint64_t, emap_data_struct_t> emap;
- typedef emap::iterator emap_itr;
+ typedef struct emap_data_struct_t {
+ uint64_t _pfid;
+ std::streampos _file_posn;
+ bool _lock;
+ emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
+ emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+ } emqp_data_struct_t;
+ typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
+ typedef std::map<uint64_t, emap_data_struct_t> emap;
+ typedef emap::iterator emap_itr;
- private:
- emap _map;
- smutex _mutex;
+private:
+ emap _map;
+ smutex _mutex;
- public:
- enq_map();
- virtual ~enq_map();
+public:
+ enq_map();
+ virtual ~enq_map();
- short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
- short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
- short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
- short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
- short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
- short get_data(const uint64_t rid, emap_data_struct_t& eds);
- bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
- short lock(const uint64_t rid); // 0=ok; -1=rid not found
- short unlock(const uint64_t rid); // 0=ok; -1=rid not found
- short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
- inline void clear() { _map.clear(); }
- inline bool empty() const { return _map.empty(); }
- inline uint32_t size() const { return uint32_t(_map.size()); }
- void rid_list(std::vector<uint64_t>& rv);
- void pfid_list(std::vector<uint64_t>& fv);
- };
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+ short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+ short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+ short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
+ short get_data(const uint64_t rid, emap_data_struct_t& eds);
+ bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
+ short lock(const uint64_t rid); // 0=ok; -1=rid not found
+ short unlock(const uint64_t rid); // 0=ok; -1=rid not found
+ short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
+ inline void clear() { _map.clear(); }
+ inline bool empty() const { return _map.empty(); }
+ inline uint32_t size() const { return uint32_t(_map.size()); }
+ void rid_list(std::vector<uint64_t>& rv);
+ void pfid_list(std::vector<uint64_t>& fv);
+};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
index f75fc2228d..9dcb2d616e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -23,7 +23,6 @@
#include <cassert>
#include <cstring>
-#include <iomanip>
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enums.h b/qpid/cpp/src/qpid/linearstore/journal/enums.h
index 106f58cf5f..90ec355955 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enums.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enums.h
@@ -26,32 +26,32 @@ namespace qpid {
namespace linearstore {
namespace journal {
- // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
- /**
- * \brief Enumeration of possible return states from journal read and write operations.
- */
- enum _iores
- {
- RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
- RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
- RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
- RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
- RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
- };
- typedef _iores iores;
+// TODO: Change this to flags, as multiple of these conditions may exist simultaneously
+/**
+* \brief Enumeration of possible return states from journal read and write operations.
+*/
+enum _iores
+{
+ RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
+ RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
+ RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
+ RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
+ RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
+};
+typedef _iores iores;
- static inline const char* iores_str(iores res)
+static inline const char* iores_str(iores res)
+{
+ switch (res)
{
- switch (res)
- {
- case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
- case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
- case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
- case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
- case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
- }
- return "<iores unknown>";
+ case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
+ case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
+ case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
+ case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+ case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
}
+ return "<iores unknown>";
+}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index c36a38d1d2..942feed824 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -21,22 +21,8 @@
#include "qpid/linearstore/journal/jcntl.h"
-#include <algorithm>
-#include <cassert>
-#include <cerrno>
-#include <cstdlib>
-#include <cstring>
-#include <fstream>
-#include <iomanip>
-#include <iostream>
-#include <qpid/linearstore/journal/EmptyFilePool.h>
-#include <qpid/linearstore/journal/EmptyFilePoolManager.h>
-#include "qpid/linearstore/journal/jerrno.h"
+#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/JournalLog.h"
-#include "qpid/linearstore/journal/utils/enq_hdr.h"
-#include <limits>
-#include <sstream>
-#include <unistd.h>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
index 612b510cb3..c12c8afbc9 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
@@ -22,551 +22,543 @@
#ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
#define QPID_LINEARSTORE_JOURNAL_JCNTL_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
- class jcntl;
-}}}
-
-#include <cstddef>
-#include <deque>
#include <qpid/linearstore/journal/LinearFileController.h>
#include "qpid/linearstore/journal/jdir.h"
#include "qpid/linearstore/journal/RecoveryManager.h"
-#include "qpid/linearstore/journal/slock.h"
-#include "qpid/linearstore/journal/smutex.h"
#include "qpid/linearstore/journal/wmgr.h"
namespace qpid {
namespace linearstore {
namespace journal {
- class EmptyFilePool;
- class EmptyFilePoolManager;
+
+class EmptyFilePool;
+class EmptyFilePoolManager;
+class JournalLog;
+
+/**
+* \brief Access and control interface for the journal. This is the top-level class for the
+* journal.
+*
+* This is the top-level journal class; one instance of this class controls one instance of the
+* journal and all its files and associated control structures. Besides this class, the only
+* other class that needs to be used at a higher level is the data_tok class, one instance of
+* which is used per data block written to the journal, and is used to track its status through
+* the AIO enqueue, read and dequeue process.
+*/
+class jcntl
+{
+protected:
+ /**
+ * \brief Journal ID
+ *
+ * This string uniquely identifies this journal instance. It will most likely be associated
+ * with the identity of the message queue with which it is associated.
+ */
+ // TODO: This is not included in any files at present, add to file_hdr?
+ std::string _jid;
+
+ /**
+ * \brief Journal directory
+ *
+ * This string stores the path to the journal directory. It may be absolute or relative, and
+ * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct,
+ * "/fastdisk/jdata/" is not.)
+ */
+ jdir _jdir;
+
+ /**
+ * \brief Initialized flag
+ *
+ * This flag starts out set to false, is set to true once this object has been initialized,
+ * either by calling initialize() or recover().
+ */
+ bool _init_flag;
+
+ /**
+ * \brief Stopped flag
+ *
+ * This flag starts out false, and is set to true when stop() is called. At this point, the
+ * journal will no longer accept messages until either initialize() or recover() is called.
+ * There is no way other than through initialization to reset this flag.
+ */
+ // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
+ // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
+ // this period, however, no new enqueue/dequeue/read requests may be accepted.
+ bool _stop_flag;
+
+ /**
+ * \brief Read-only state flag used during recover.
+ *
+ * When true, this flag prevents journal write operations (enqueue and dequeue), but
+ * allows read to occur. It is used during recovery, and is reset when recovered() is
+ * called.
+ */
+ bool _readonly_flag;
+
+ // Journal control structures
+ JournalLog& _jrnl_log; ///< Ref to Journal Log instance
+ LinearFileController _linearFileController; ///< Linear File Controller
+ EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue
+ enq_map _emap; ///< Enqueue map for low water mark management
+ txn_map _tmap; ///< Transaction map open transactions
+ wmgr _wmgr; ///< Write page manager which manages AIO
+ RecoveryManager _recoveryManager; ///< Recovery data used for recovery
+ smutex _wr_mutex; ///< Mutex for journal writes
+
+public:
+ static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+ static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+
+ /**
+ * \brief Journal constructor.
+ *
+ * Constructor which sets the physical file location and base name.
+ *
+ * \param jid A unique identifier for this journal instance.
+ * \param jdir The directory which will contain the journal files.
+ * \param base_filename The string which will be used to start all journal filenames.
+ */
+ jcntl(const std::string& jid,
+ const std::string& jdir,
+ JournalLog& jrnl_log);
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~jcntl();
+
+ inline const std::string& id() const { return _jid; }
+
+ inline const std::string& jrnl_dir() const { return _jdir.dirname(); }
+
+ /**
+ * \brief Initialize the journal for storing data.
+ *
+ * Initialize the journal by creating new journal data files and initializing internal
+ * control structures. When complete, the journal will be empty, and ready to store data.
+ *
+ * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
+ * the data from an existing journal, use recover().
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+ * and deleted.</b>
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+ * used.</b>
+ *
+ * \param num_jfiles The number of journal files to be created.
+ * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
+ * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
+ * no files are added and an exception will be thrown if the journal runs out of file space.
+ * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
+ * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
+ * this number of files exist and the journal runs out of space, an exception will be thrown. This number
+ * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
+ * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
+ * \param jfsize_sblks The size of each journal file expressed in softblocks.
+ * \param wcache_num_pages The number of write cache pages to create.
+ * \param wcache_pgsize_sblks The size in sblks of each write cache page.
+ * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
+ *
+ * \exception TODO
+ */
+ void initialize(EmptyFilePool* efpp,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp);
/**
- * \brief Access and control interface for the journal. This is the top-level class for the
- * journal.
- *
- * This is the top-level journal class; one instance of this class controls one instance of the
- * journal and all its files and associated control structures. Besides this class, the only
- * other class that needs to be used at a higher level is the data_tok class, one instance of
- * which is used per data block written to the journal, and is used to track its status through
- * the AIO enqueue, read and dequeue process.
+ * /brief Initialize journal by recovering state from previously written journal.
+ *
+ * Initialize journal by recovering state from previously written journal. The journal files
+ * are analyzed, and all records that have not been dequeued and that remain in the journal
+ * will be available for reading. The journal is placed in a read-only state until
+ * recovered() is called; any calls to enqueue or dequeue will fail with an exception
+ * in this state.
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+ * and deleted.</b>
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+ * used.</b>
+ *
+ * \param num_jfiles The number of journal files to be created.
+ * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
+ * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
+ * no files are added and an exception will be thrown if the journal runs out of file space.
+ * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
+ * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
+ * this number of files exist and the journal runs out of space, an exception will be thrown. This number
+ * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
+ * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
+ * \param jfsize_sblks The size of each journal file expressed in softblocks.
+ * \param wcache_num_pages The number of write cache pages to create.
+ * \param wcache_pgsize_sblks The size in sblks of each write cache page.
+ * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
+ * \param prep_txn_list_ptr
+ * \param highest_rid Returns the highest rid found in the journal during recover
+ *
+ * \exception TODO
*/
- class jcntl
- {
- protected:
- /**
- * \brief Journal ID
- *
- * This string uniquely identifies this journal instance. It will most likely be associated
- * with the identity of the message queue with which it is associated.
- */
- // TODO: This is not included in any files at present, add to file_hdr?
- std::string _jid;
-
- /**
- * \brief Journal directory
- *
- * This string stores the path to the journal directory. It may be absolute or relative, and
- * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct,
- * "/fastdisk/jdata/" is not.)
- */
- jdir _jdir;
-
- /**
- * \brief Initialized flag
- *
- * This flag starts out set to false, is set to true once this object has been initialized,
- * either by calling initialize() or recover().
- */
- bool _init_flag;
-
- /**
- * \brief Stopped flag
- *
- * This flag starts out false, and is set to true when stop() is called. At this point, the
- * journal will no longer accept messages until either initialize() or recover() is called.
- * There is no way other than through initialization to reset this flag.
- */
- // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
- // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
- // this period, however, no new enqueue/dequeue/read requests may be accepted.
- bool _stop_flag;
-
- /**
- * \brief Read-only state flag used during recover.
- *
- * When true, this flag prevents journal write operations (enqueue and dequeue), but
- * allows read to occur. It is used during recovery, and is reset when recovered() is
- * called.
- */
- bool _readonly_flag;
-
- // Journal control structures
- JournalLog& _jrnl_log; ///< Ref to Journal Log instance
- LinearFileController _linearFileController; ///< Linear File Controller
- EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue
- enq_map _emap; ///< Enqueue map for low water mark management
- txn_map _tmap; ///< Transaction map open transactions
- wmgr _wmgr; ///< Write page manager which manages AIO
- RecoveryManager _recoveryManager; ///< Recovery data used for recovery
- smutex _wr_mutex; ///< Mutex for journal writes
-
- public:
- static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
- static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
-
- /**
- * \brief Journal constructor.
- *
- * Constructor which sets the physical file location and base name.
- *
- * \param jid A unique identifier for this journal instance.
- * \param jdir The directory which will contain the journal files.
- * \param base_filename The string which will be used to start all journal filenames.
- */
- jcntl(const std::string& jid,
- const std::string& jdir,
- JournalLog& jrnl_log);
-
- /**
- * \brief Destructor.
- */
- virtual ~jcntl();
-
- inline const std::string& id() const { return _jid; }
-
- inline const std::string& jrnl_dir() const { return _jdir.dirname(); }
-
- /**
- * \brief Initialize the journal for storing data.
- *
- * Initialize the journal by creating new journal data files and initializing internal
- * control structures. When complete, the journal will be empty, and ready to store data.
- *
- * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
- * the data from an existing journal, use recover().
- *
- * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
- * and deleted.</b>
- *
- * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
- * used.</b>
- *
- * \param num_jfiles The number of journal files to be created.
- * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
- * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
- * no files are added and an exception will be thrown if the journal runs out of file space.
- * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
- * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
- * this number of files exist and the journal runs out of space, an exception will be thrown. This number
- * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
- * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
- * \param jfsize_sblks The size of each journal file expressed in softblocks.
- * \param wcache_num_pages The number of write cache pages to create.
- * \param wcache_pgsize_sblks The size in sblks of each write cache page.
- * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
- *
- * \exception TODO
- */
- void initialize(EmptyFilePool* efpp,
- const uint16_t wcache_num_pages,
- const uint32_t wcache_pgsize_sblks,
- aio_callback* const cbp);
-
- /**
- * /brief Initialize journal by recovering state from previously written journal.
- *
- * Initialize journal by recovering state from previously written journal. The journal files
- * are analyzed, and all records that have not been dequeued and that remain in the journal
- * will be available for reading. The journal is placed in a read-only state until
- * recovered() is called; any calls to enqueue or dequeue will fail with an exception
- * in this state.
- *
- * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
- * and deleted.</b>
- *
- * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
- * used.</b>
- *
- * \param num_jfiles The number of journal files to be created.
- * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
- * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
- * no files are added and an exception will be thrown if the journal runs out of file space.
- * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
- * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
- * this number of files exist and the journal runs out of space, an exception will be thrown. This number
- * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
- * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
- * \param jfsize_sblks The size of each journal file expressed in softblocks.
- * \param wcache_num_pages The number of write cache pages to create.
- * \param wcache_pgsize_sblks The size in sblks of each write cache page.
- * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
- * \param prep_txn_list_ptr
- * \param highest_rid Returns the highest rid found in the journal during recover
- *
- * \exception TODO
- */
- void recover(EmptyFilePoolManager* efpm,
- const uint16_t wcache_num_pages,
- const uint32_t wcache_pgsize_sblks,
- aio_callback* const cbp,
- const std::vector<std::string>* prep_txn_list_ptr,
- uint64_t& highest_rid);
-
- /**
- * \brief Notification to the journal that recovery is complete and that normal operation
- * may resume.
- *
- * This call notifies the journal that recovery is complete and that normal operation
- * may resume. The read pointers are reset so that all records read as a part of recover
- * may be re-read during normal operation. The read-only flag is then reset, allowing
- * enqueue and dequeue operations to resume.
- *
- * \exception TODO
- */
- void recover_complete();
-
- /**
- * \brief Stops journal and deletes all journal files.
- *
- * Clear the journal directory of all journal files matching the base filename.
- *
- * \exception TODO
- */
- void delete_jrnl_files();
-
- /**
- * \brief Enqueue data.
- *
- * Enqueue data or part thereof. If a large data block is being written, then it may be
- * enqueued in parts by setting this_data_len to the size of the data being written in this
- * call. The total data size must be known in advance, however, as this is written into the
- * record header on the first record write. The state of the write (i.e. how much has been
- * written so far) is maintained in the data token dtokp. Partial writes will return in state
- * ENQ_PART.
- *
- * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write
- * operation did not complete successfully or was partially completed. The action taken under
- * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT
- * implies that all pages in the write page cache are waiting for AIO operations to return,
- * and that the call should be remade after waiting a bit.
- *
- * Example: If a write of 99 kB is divided into three equal parts, then the following states
- * and returns would characterize a successful operation:
- * <pre>
- * dtok. dtok. dtok.
- * Pperation Return wstate() dsize() written() Comment
- * -----------------+--------+--------+-------+---------+------------------------------------
- * NONE 0 0 Value of dtok before op
- * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1
- * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed
- * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again
- * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3
- * </pre>
- *
- * \param data_buff Pointer to data to be enqueued for this enqueue operation.
- * \param tot_data_len Total data length.
- * \param this_data_len Amount to be written in this enqueue operation.
- * \param dtokp Pointer to data token which contains the details of the enqueue operation.
- * \param transient Flag indicating transient persistence (ie, ignored on recover).
- *
- * \exception TODO
- */
- iores enqueue_data_record(const void* const data_buff,
+ void recover(EmptyFilePoolManager* efpm,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp,
+ const std::vector<std::string>* prep_txn_list_ptr,
+ uint64_t& highest_rid);
+
+ /**
+ * \brief Notification to the journal that recovery is complete and that normal operation
+ * may resume.
+ *
+ * This call notifies the journal that recovery is complete and that normal operation
+ * may resume. The read pointers are reset so that all records read as a part of recover
+ * may be re-read during normal operation. The read-only flag is then reset, allowing
+ * enqueue and dequeue operations to resume.
+ *
+ * \exception TODO
+ */
+ void recover_complete();
+
+ /**
+ * \brief Stops journal and deletes all journal files.
+ *
+ * Clear the journal directory of all journal files matching the base filename.
+ *
+ * \exception TODO
+ */
+ void delete_jrnl_files();
+
+ /**
+ * \brief Enqueue data.
+ *
+ * Enqueue data or part thereof. If a large data block is being written, then it may be
+ * enqueued in parts by setting this_data_len to the size of the data being written in this
+ * call. The total data size must be known in advance, however, as this is written into the
+ * record header on the first record write. The state of the write (i.e. how much has been
+ * written so far) is maintained in the data token dtokp. Partial writes will return in state
+ * ENQ_PART.
+ *
+ * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write
+ * operation did not complete successfully or was partially completed. The action taken under
+ * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT
+ * implies that all pages in the write page cache are waiting for AIO operations to return,
+ * and that the call should be remade after waiting a bit.
+ *
+ * Example: If a write of 99 kB is divided into three equal parts, then the following states
+ * and returns would characterize a successful operation:
+ * <pre>
+ * dtok. dtok. dtok.
+ * Pperation Return wstate() dsize() written() Comment
+ * -----------------+--------+--------+-------+---------+------------------------------------
+ * NONE 0 0 Value of dtok before op
+ * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1
+ * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed
+ * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again
+ * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3
+ * </pre>
+ *
+ * \param data_buff Pointer to data to be enqueued for this enqueue operation.
+ * \param tot_data_len Total data length.
+ * \param this_data_len Amount to be written in this enqueue operation.
+ * \param dtokp Pointer to data token which contains the details of the enqueue operation.
+ * \param transient Flag indicating transient persistence (ie, ignored on recover).
+ *
+ * \exception TODO
+ */
+ iores enqueue_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const bool transient = false);
+
+ iores enqueue_extern_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const bool transient = false);
+
+ /**
+ * \brief Enqueue data.
+ *
+ * \param data_buff Pointer to data to be enqueued for this enqueue operation.
+ * \param tot_data_len Total data length.
+ * \param this_data_len Amount to be written in this enqueue operation.
+ * \param dtokp Pointer to data token which contains the details of the enqueue operation.
+ * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+ * non-transactional.
+ * \param transient Flag indicating transient persistence (ie, ignored on recover).
+ *
+ * \exception TODO
+ */
+ iores enqueue_txn_data_record(const void* const data_buff,
const std::size_t tot_data_len,
const std::size_t this_data_len,
data_tok* dtokp,
+ const std::string& xid,
const bool transient = false);
- iores enqueue_extern_data_record(const std::size_t tot_data_len,
+ iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
+ const std::string& xid,
const bool transient = false);
- /**
- * \brief Enqueue data.
- *
- * \param data_buff Pointer to data to be enqueued for this enqueue operation.
- * \param tot_data_len Total data length.
- * \param this_data_len Amount to be written in this enqueue operation.
- * \param dtokp Pointer to data token which contains the details of the enqueue operation.
- * \param xid String containing xid. An empty string (i.e. length=0) will be considered
- * non-transactional.
- * \param transient Flag indicating transient persistence (ie, ignored on recover).
- *
- * \exception TODO
- */
- iores enqueue_txn_data_record(const void* const data_buff,
- const std::size_t tot_data_len,
- const std::size_t this_data_len,
- data_tok* dtokp,
- const std::string& xid,
- const bool transient = false);
-
- iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
- data_tok* dtokp,
- const std::string& xid,
- const bool transient = false);
-
- /**
- * \brief Reads data from the journal. It is the responsibility of the reader to free
- * the memory that is allocated through this call - see below for details.
- *
- * Reads the next non-dequeued data record from the journal.
- *
- * <b>Note</b> that this call allocates memory into which the data and XID are copied. It
- * is the responsibility of the caller to free this memory. The memory for the data and
- * XID are allocated in a single call, and the XID precedes the data in the memory space.
- * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory.
- * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL,
- * and it is the data pointer datapp that must be freed. Should neither an XID nor data be
- * present (ie an empty record), then no memory is allocated, and both pointers will be NULL.
- * In this case, there is no need to free memory.
- *
- * TODO: Fix this lousy interface. The caller should NOT be required to clean up these
- * pointers! Rather use a struct, or better still, let the data token carry the data and
- * xid pointers and lengths, and have the data token both allocate and delete.
- *
- * \param datapp Pointer to pointer that will be set to point to memory allocated and
- * containing the data. Will be set to NULL if the call fails or there is no data
- * in the record.
- * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call
- * fails or if there is no data in the record.
- * \param xidpp Pointer to pointer that will be set to point to memory allocated and
- * containing the XID. Will be set to NULL if the call fails or there is no XID attached
- * to this record.
- * \param xidsize Ref that will be set to the size of the XID.
- * \param transient Ref that will be set true if record is transient.
- * \param external Ref that will be set true if record is external. In this case, the data
- * pointer datapp will be set to NULL, but dsize will contain the size of the data.
- * NOTE: If there is an xid, then xidpp must be freed.
- * \param dtokp Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- * \param ignore_pending_txns When false (default), if the next record to be read is locked
- * by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set
- * to true, then locks are ignored. This is required for reading of the Transaction
- * Prepared List (TPL) which may have its entries locked, but may be read from
- * time-to-time, and needs all its records (locked and unlocked) to be available.
- *
- * \exception TODO
- */
- iores read_data_record(void** const datapp,
- std::size_t& dsize,
- void** const xidpp,
- std::size_t& xidsize,
- bool& transient,
- bool& external,
- data_tok* const dtokp,
- bool ignore_pending_txns = false);
-
- /**
- * \brief Dequeues (marks as no longer needed) data record in journal.
- *
- * Dequeues (marks as no longer needed) data record in journal. Note that it is possible
- * to use the same data token instance used to enqueue this data; it contains the record ID
- * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the
- * record to be dequeued and the write state of ENQ must be manually set in a new or reset
- * instance of data_tok.
- *
- * \param dtokp Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
- * prepared XID list items, sets whether the complete() was called in commit or abort
- * mode.
- *
- * \exception TODO
- */
- iores dequeue_data_record(data_tok* const dtokp,
+ /**
+ * \brief Reads data from the journal. It is the responsibility of the reader to free
+ * the memory that is allocated through this call - see below for details.
+ *
+ * Reads the next non-dequeued data record from the journal.
+ *
+ * <b>Note</b> that this call allocates memory into which the data and XID are copied. It
+ * is the responsibility of the caller to free this memory. The memory for the data and
+ * XID are allocated in a single call, and the XID precedes the data in the memory space.
+ * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory.
+ * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL,
+ * and it is the data pointer datapp that must be freed. Should neither an XID nor data be
+ * present (ie an empty record), then no memory is allocated, and both pointers will be NULL.
+ * In this case, there is no need to free memory.
+ *
+ * TODO: Fix this lousy interface. The caller should NOT be required to clean up these
+ * pointers! Rather use a struct, or better still, let the data token carry the data and
+ * xid pointers and lengths, and have the data token both allocate and delete.
+ *
+ * \param datapp Pointer to pointer that will be set to point to memory allocated and
+ * containing the data. Will be set to NULL if the call fails or there is no data
+ * in the record.
+ * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call
+ * fails or if there is no data in the record.
+ * \param xidpp Pointer to pointer that will be set to point to memory allocated and
+ * containing the XID. Will be set to NULL if the call fails or there is no XID attached
+ * to this record.
+ * \param xidsize Ref that will be set to the size of the XID.
+ * \param transient Ref that will be set true if record is transient.
+ * \param external Ref that will be set true if record is external. In this case, the data
+ * pointer datapp will be set to NULL, but dsize will contain the size of the data.
+ * NOTE: If there is an xid, then xidpp must be freed.
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+ * through journal.
+ * \param ignore_pending_txns When false (default), if the next record to be read is locked
+ * by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set
+ * to true, then locks are ignored. This is required for reading of the Transaction
+ * Prepared List (TPL) which may have its entries locked, but may be read from
+ * time-to-time, and needs all its records (locked and unlocked) to be available.
+ *
+ * \exception TODO
+ */
+ iores read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns = false);
+
+ /**
+ * \brief Dequeues (marks as no longer needed) data record in journal.
+ *
+ * Dequeues (marks as no longer needed) data record in journal. Note that it is possible
+ * to use the same data token instance used to enqueue this data; it contains the record ID
+ * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the
+ * record to be dequeued and the write state of ENQ must be manually set in a new or reset
+ * instance of data_tok.
+ *
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+ * through journal.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
+ *
+ * \exception TODO
+ */
+ iores dequeue_data_record(data_tok* const dtokp,
+ const bool txn_coml_commit = false);
+
+ /**
+ * \brief Dequeues (marks as no longer needed) data record in journal.
+ *
+ * Dequeues (marks as no longer needed) data record in journal as part of a transaction.
+ * Note that it is possible to use the same data token instance used to enqueue this data;
+ * it contains the RID needed to correctly mark this data as dequeued in the journal.
+ * Otherwise the RID of the record to be dequeued and the write state of ENQ must be
+ * manually set in a new or reset instance of data_tok.
+ *
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+ * through journal.
+ * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+ * non-transactional.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
+ *
+ * \exception TODO
+ */
+ iores dequeue_txn_data_record(data_tok* const dtokp,
+ const std::string& xid,
const bool txn_coml_commit = false);
- /**
- * \brief Dequeues (marks as no longer needed) data record in journal.
- *
- * Dequeues (marks as no longer needed) data record in journal as part of a transaction.
- * Note that it is possible to use the same data token instance used to enqueue this data;
- * it contains the RID needed to correctly mark this data as dequeued in the journal.
- * Otherwise the RID of the record to be dequeued and the write state of ENQ must be
- * manually set in a new or reset instance of data_tok.
- *
- * \param dtokp Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- * \param xid String containing xid. An empty string (i.e. length=0) will be considered
- * non-transactional.
- * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
- * prepared XID list items, sets whether the complete() was called in commit or abort
- * mode.
- *
- * \exception TODO
- */
- iores dequeue_txn_data_record(data_tok* const dtokp,
- const std::string& xid,
- const bool txn_coml_commit = false);
-
- /**
- * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
- *
- * Abort the transaction for all records enqueued with the matching xid. All enqueued records
- * are effectively deleted from the journal, and can not be read. All dequeued records remain
- * as though they had never been dequeued.
- *
- * \param dtokp Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- * \param xid String containing xid.
- *
- * \exception TODO
- */
- iores txn_abort(data_tok* const dtokp,
- const std::string& xid);
-
- /**
- * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
- *
- * Commit the transaction for all records enqueued with the matching xid. All enqueued
- * records are effectively released for reading and dequeueing. All dequeued records are
- * removed and can no longer be accessed.
- *
- * \param dtokp Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- * \param xid String containing xid.
- *
- * \exception TODO
- */
- iores txn_commit(data_tok* const dtokp,
- const std::string& xid);
-
- /**
- * \brief Check whether all the enqueue records for the given xid have reached disk.
- *
- * \param xid String containing xid.
- *
- * \exception TODO
- */
- bool is_txn_synced(const std::string& xid);
-
- /**
- * \brief Forces a check for returned AIO write events.
- *
- * Forces a check for returned AIO write events. This is normally performed by enqueue() and
- * dequeue() operations, but if these operations cease, then this call needs to be made to
- * force the processing of any outstanding AIO operations.
- */
- int32_t get_wr_events(timespec* const timeout);
-
- /**
- * \brief Stop the journal from accepting any further requests to read or write data.
- *
- * This operation is used to stop the journal. This is the normal mechanism for bringing the
- * journal to an orderly stop. Any outstanding AIO operations or partially written pages in
- * the write page cache will by flushed and will complete.
- *
- * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring
- * it.
- *
- * \param block_till_aio_cmpl If true, will block the thread while waiting for all
- * outstanding AIO operations to complete.
- */
- void stop(const bool block_till_aio_cmpl = false);
-
- /**
- * \brief Force a flush of the write page cache, creating a single AIO write operation.
- */
- iores flush(const bool block_till_aio_cmpl = false);
-
- inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
-
- inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
-
- uint32_t get_wr_outstanding_aio_dblks() const;
-
- uint32_t get_rd_outstanding_aio_dblks() const;
-
- LinearFileController& getLinearFileControllerRef();
-
- /**
- * \brief Check if a particular rid is enqueued. Note that this function will return
- * false if the rid is transactionally enqueued and is not committed, or if it is
- * locked (i.e. transactionally dequeued, but the dequeue has not been committed).
- */
- inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
-
- inline bool is_locked(const uint64_t rid) {
- if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
- return false;
- return _emap.is_locked(rid) == enq_map::EMAP_TRUE;
- }
-
- inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); }
-
- inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
-
- inline uint32_t get_open_txn_cnt() const { return _tmap.size(); }
-
- // TODO Make this a const, but txn_map must support const first.
- inline txn_map& get_txn_map() { return _tmap; }
-
- /**
- * \brief Check if the journal is stopped.
- *
- * \return <b><i>true</i></b> if the jouranl is stopped;
- * <b><i>false</i></b> otherwise.
- */
- inline bool is_stopped() { return _stop_flag; }
-
- /**
- * \brief Check if the journal is ready to read and write data.
- *
- * Checks if the journal is ready to read and write data. This function will return
- * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop()
- * function has not been called since the initialization.
- *
- * Note that the journal may also be stopped if an internal error occurs (such as running out
- * of data journal file space).
- *
- * \return <b><i>true</i></b> if the journal is ready to read and write data;
- * <b><i>false</i></b> otherwise.
- */
- inline bool is_ready() const { return _init_flag && !_stop_flag; }
-
- inline bool is_read_only() const { return _readonly_flag; }
-
- /**
- * \brief Get the journal directory.
- *
- * This returns the journal directory as set during initialization. This is the directory
- * into which the journal files will be written.
- */
- inline const std::string& dirname() const { return _jdir.dirname(); }
-
- // Management instrumentation callbacks
- inline virtual void instr_incr_outstanding_aio_cnt() {}
- inline virtual void instr_decr_outstanding_aio_cnt() {}
-
- protected:
- static bool _init;
- static bool init_statics();
-
- /**
- * \brief Check status of journal before allowing write operations.
- */
- void check_wstatus(const char* fn_name) const;
-
- /**
- * \brief Check status of journal before allowing read operations.
- */
- void check_rstatus(const char* fn_name) const;
-
- /**
- * \brief Call that blocks while waiting for all outstanding AIOs to complete
- */
- void aio_cmpl_wait();
-
- /**
- * \brief Call that blocks until at least one message returns; used to wait for
- * AIO wait conditions to clear.
- */
- bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
- };
+ /**
+ * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
+ *
+ * Abort the transaction for all records enqueued with the matching xid. All enqueued records
+ * are effectively deleted from the journal, and can not be read. All dequeued records remain
+ * as though they had never been dequeued.
+ *
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+ * through journal.
+ * \param xid String containing xid.
+ *
+ * \exception TODO
+ */
+ iores txn_abort(data_tok* const dtokp,
+ const std::string& xid);
+
+ /**
+ * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
+ *
+ * Commit the transaction for all records enqueued with the matching xid. All enqueued
+ * records are effectively released for reading and dequeueing. All dequeued records are
+ * removed and can no longer be accessed.
+ *
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+ * through journal.
+ * \param xid String containing xid.
+ *
+ * \exception TODO
+ */
+ iores txn_commit(data_tok* const dtokp,
+ const std::string& xid);
+
+ /**
+ * \brief Check whether all the enqueue records for the given xid have reached disk.
+ *
+ * \param xid String containing xid.
+ *
+ * \exception TODO
+ */
+ bool is_txn_synced(const std::string& xid);
+
+ /**
+ * \brief Forces a check for returned AIO write events.
+ *
+ * Forces a check for returned AIO write events. This is normally performed by enqueue() and
+ * dequeue() operations, but if these operations cease, then this call needs to be made to
+ * force the processing of any outstanding AIO operations.
+ */
+ int32_t get_wr_events(timespec* const timeout);
+
+ /**
+ * \brief Stop the journal from accepting any further requests to read or write data.
+ *
+ * This operation is used to stop the journal. This is the normal mechanism for bringing the
+ * journal to an orderly stop. Any outstanding AIO operations or partially written pages in
+ * the write page cache will by flushed and will complete.
+ *
+ * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring
+ * it.
+ *
+ * \param block_till_aio_cmpl If true, will block the thread while waiting for all
+ * outstanding AIO operations to complete.
+ */
+ void stop(const bool block_till_aio_cmpl = false);
+
+ /**
+ * \brief Force a flush of the write page cache, creating a single AIO write operation.
+ */
+ iores flush(const bool block_till_aio_cmpl = false);
+
+ inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
+
+ inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
+
+ uint32_t get_wr_outstanding_aio_dblks() const;
+
+ uint32_t get_rd_outstanding_aio_dblks() const;
+
+ LinearFileController& getLinearFileControllerRef();
+
+ /**
+ * \brief Check if a particular rid is enqueued. Note that this function will return
+ * false if the rid is transactionally enqueued and is not committed, or if it is
+ * locked (i.e. transactionally dequeued, but the dequeue has not been committed).
+ */
+ inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+
+ inline bool is_locked(const uint64_t rid) {
+ if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
+ return false;
+ return _emap.is_locked(rid) == enq_map::EMAP_TRUE;
+ }
+
+ inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); }
+
+ inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+
+ inline uint32_t get_open_txn_cnt() const { return _tmap.size(); }
+
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
+
+ /**
+ * \brief Check if the journal is stopped.
+ *
+ * \return <b><i>true</i></b> if the jouranl is stopped;
+ * <b><i>false</i></b> otherwise.
+ */
+ inline bool is_stopped() { return _stop_flag; }
+
+ /**
+ * \brief Check if the journal is ready to read and write data.
+ *
+ * Checks if the journal is ready to read and write data. This function will return
+ * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop()
+ * function has not been called since the initialization.
+ *
+ * Note that the journal may also be stopped if an internal error occurs (such as running out
+ * of data journal file space).
+ *
+ * \return <b><i>true</i></b> if the journal is ready to read and write data;
+ * <b><i>false</i></b> otherwise.
+ */
+ inline bool is_ready() const { return _init_flag && !_stop_flag; }
+
+ inline bool is_read_only() const { return _readonly_flag; }
+
+ /**
+ * \brief Get the journal directory.
+ *
+ * This returns the journal directory as set during initialization. This is the directory
+ * into which the journal files will be written.
+ */
+ inline const std::string& dirname() const { return _jdir.dirname(); }
+
+ // Management instrumentation callbacks
+ inline virtual void instr_incr_outstanding_aio_cnt() {}
+ inline virtual void instr_decr_outstanding_aio_cnt() {}
+
+protected:
+ static bool _init;
+ static bool init_statics();
+
+ /**
+ * \brief Check status of journal before allowing write operations.
+ */
+ void check_wstatus(const char* fn_name) const;
+
+ /**
+ * \brief Check status of journal before allowing read operations.
+ */
+ void check_rstatus(const char* fn_name) const;
+
+ /**
+ * \brief Call that blocks while waiting for all outstanding AIOs to complete
+ */
+ void aio_cmpl_wait();
+
+ /**
+ * \brief Call that blocks until at least one message returns; used to wait for
+ * AIO wait conditions to clear.
+ */
+ bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
+};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
index 6f28a48a7f..896f44ceff 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
@@ -21,14 +21,10 @@
#include "qpid/linearstore/journal/jdir.h"
-#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <iomanip>
-#include "qpid/linearstore/journal/jcfg.h"
-#include "qpid/linearstore/journal/jerrno.h"
#include "qpid/linearstore/journal/jexception.h"
-#include <sstream>
#include <sys/stat.h>
#include <unistd.h>
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.h b/qpid/cpp/src/qpid/linearstore/journal/jdir.h
index c13a5f5af0..86b16f8545 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jdir.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.h
@@ -22,13 +22,6 @@
#ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
#define QPID_LINEARSTORE_JOURNAL_JDIR_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
-class jdir;
-}}}
-
-//#include "qpid/linearstore/jrnl/jinf.h"
#include <dirent.h>
#include <string>
#include <vector>
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp b/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp
index eb6e80d2b3..49f486746a 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp
@@ -22,8 +22,6 @@
#include "qpid/linearstore/journal/jexception.h"
#include <iomanip>
-#include <sstream>
-#include "qpid/linearstore/journal/jerrno.h"
#define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
index 8a728bba29..7cb6df13a4 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
@@ -22,14 +22,11 @@
#ifndef QPID_LINEARSTORE_JOURNAL_JREC_H
#define QPID_LINEARSTORE_JOURNAL_JREC_H
-#include <cstddef>
#include <fstream>
#include "qpid/linearstore/journal/jcfg.h"
#include <stdint.h>
-#include <string>
struct rec_hdr_t;
-struct rec_tail_t;
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp
index 11553aaf47..54755dd4ed 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp
@@ -21,14 +21,6 @@
#include "qpid/linearstore/journal/pmgr.h"
-#include <cerrno>
-#include <cstdlib>
-#include <cstring>
-#include "qpid/linearstore/journal/jcfg.h"
-#include "qpid/linearstore/journal/jcntl.h"
-#include "qpid/linearstore/journal/jerrno.h"
-#include <sstream>
-
namespace qpid {
namespace linearstore {
namespace journal {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/pmgr.h b/qpid/cpp/src/qpid/linearstore/journal/pmgr.h
index 5de253c3d3..157a6f0566 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/pmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/pmgr.h
@@ -22,17 +22,8 @@
#ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H
#define QPID_LINEARSTORE_JOURNAL_PMGR_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
- class pmgr;
- class jcntl;
-}}}
-
#include <deque>
#include "qpid/linearstore/journal/aio.h"
-#include "qpid/linearstore/journal/aio_callback.h"
-#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/deq_rec.h"
#include "qpid/linearstore/journal/enq_map.h"
#include "qpid/linearstore/journal/enq_rec.h"
@@ -43,83 +34,86 @@ namespace qpid {
namespace linearstore {
namespace journal {
- class JournalFile;
+class aio_callback;
+class data_tok;
+class jcntl;
+class JournalFile;
+/**
+* \brief Abstract class for managing either read or write page cache of arbitrary size and
+* number of cache_num_pages.
+*/
+class pmgr
+{
+public:
/**
- * \brief Abstract class for managing either read or write page cache of arbitrary size and
- * number of cache_num_pages.
+ * \brief Enumeration of possible stats of a page within a page cache.
*/
- class pmgr
+ enum page_state
{
- public:
- /**
- * \brief Enumeration of possible stats of a page within a page cache.
- */
- enum page_state
- {
- UNUSED, ///< A page is uninitialized, contains no data.
- IN_USE, ///< Page is in use.
- AIO_PENDING ///< An AIO request outstanding.
- };
+ UNUSED, ///< A page is uninitialized, contains no data.
+ IN_USE, ///< Page is in use.
+ AIO_PENDING ///< An AIO request outstanding.
+ };
- /**
- * \brief Page control block, carries control and state information for each page in the
- * cache.
- */
- struct page_cb
- {
- uint16_t _index; ///< Index of this page
- page_state _state; ///< Status of page
- uint64_t _frid; ///< First rid in page (used for fhdr init)
- uint32_t _wdblks; ///< Total number of dblks in page so far
- uint32_t _rdblks; ///< Total number of dblks in page
- std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
- JournalFile* _jfp; ///< Journal file for incrementing compl counts
- void* _pbuff; ///< Page buffer
+ /**
+ * \brief Page control block, carries control and state information for each page in the
+ * cache.
+ */
+ struct page_cb
+ {
+ uint16_t _index; ///< Index of this page
+ page_state _state; ///< Status of page
+ uint64_t _frid; ///< First rid in page (used for fhdr init)
+ uint32_t _wdblks; ///< Total number of dblks in page so far
+ uint32_t _rdblks; ///< Total number of dblks in page
+ std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
+ JournalFile* _jfp; ///< Journal file for incrementing compl counts
+ void* _pbuff; ///< Page buffer
- page_cb(uint16_t index); ///< Convenience constructor
- const char* state_str() const; ///< Return state as string for this pcb
- };
+ page_cb(uint16_t index); ///< Convenience constructor
+ const char* state_str() const; ///< Return state as string for this pcb
+ };
- protected:
- static const uint32_t _sblkSizeBytes; ///< Disk softblock size
- uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
- uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages
- jcntl* _jc; ///< Pointer to journal controller
- enq_map& _emap; ///< Ref to enqueue map
- txn_map& _tmap; ///< Ref to transaction map
- void* _page_base_ptr; ///< Base pointer to page memory
- void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
- page_cb* _page_cb_arr; ///< Array of page_cb structs
- aio_cb* _aio_cb_arr; ///< Array of iocb structs
- aio_event* _aio_event_arr; ///< Array of io_events
- io_context_t _ioctx; ///< AIO context for read/write operations
- uint16_t _pg_index; ///< Index of current page being used
- uint32_t _pg_cntr; ///< Page counter; determines if file rotation req'd
- uint32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
- uint32_t _aio_evt_rem; ///< Remaining AIO events
- aio_callback* _cbp; ///< Pointer to callback object
+protected:
+ static const uint32_t _sblkSizeBytes; ///< Disk softblock size
+ uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
+ uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages
+ jcntl* _jc; ///< Pointer to journal controller
+ enq_map& _emap; ///< Ref to enqueue map
+ txn_map& _tmap; ///< Ref to transaction map
+ void* _page_base_ptr; ///< Base pointer to page memory
+ void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
+ page_cb* _page_cb_arr; ///< Array of page_cb structs
+ aio_cb* _aio_cb_arr; ///< Array of iocb structs
+ aio_event* _aio_event_arr; ///< Array of io_events
+ io_context_t _ioctx; ///< AIO context for read/write operations
+ uint16_t _pg_index; ///< Index of current page being used
+ uint32_t _pg_cntr; ///< Page counter; determines if file rotation req'd
+ uint32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
+ uint32_t _aio_evt_rem; ///< Remaining AIO events
+ aio_callback* _cbp; ///< Pointer to callback object
- enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
- deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
- txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
- public:
- pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
- virtual ~pmgr();
+public:
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
+ virtual ~pmgr();
- virtual int32_t get_events(timespec* const timeout, bool flush) = 0;
- inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; }
- static const char* page_state_str(page_state ps);
- inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
- inline uint16_t cache_num_pages() const { return _cache_num_pages; }
+ virtual int32_t get_events(timespec* const timeout, bool flush) = 0;
+ inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+ static const char* page_state_str(page_state ps);
+ inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+ inline uint16_t cache_num_pages() const { return _cache_num_pages; }
- protected:
- virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks,
- const uint16_t cache_num_pages);
- virtual void rotate_page() = 0;
- virtual void clean();
- };
+protected:
+ virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks,
+ const uint16_t cache_num_pages);
+ virtual void rotate_page() = 0;
+ virtual void clean();
+};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/slock.h b/qpid/cpp/src/qpid/linearstore/journal/slock.h
index 17ed06dfce..12e9e2d08c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/slock.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/slock.h
@@ -22,7 +22,6 @@
#ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H
#define QPID_LINEARSTORE_JOURNAL_SLOCK_H
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/smutex.h"
#include <pthread.h>
@@ -30,42 +29,42 @@ namespace qpid {
namespace linearstore {
namespace journal {
- // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
- class slock
+// Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+class slock
+{
+protected:
+ const smutex& _sm;
+public:
+ inline slock(const smutex& sm) : _sm(sm)
{
- protected:
- const smutex& _sm;
- public:
- inline slock(const smutex& sm) : _sm(sm)
- {
- PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
- }
- inline ~slock()
- {
- PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
- }
- };
+ PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
+ }
+ inline ~slock()
+ {
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
+ }
+};
- // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
- class stlock
+// Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+class stlock
+{
+protected:
+ const smutex& _sm;
+ bool _locked;
+public:
+ inline stlock(const smutex& sm) : _sm(sm), _locked(false)
+ {
+ int ret = ::pthread_mutex_trylock(_sm.get());
+ _locked = (ret == 0); // check if lock obtained
+ if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
+ }
+ inline ~stlock()
{
- protected:
- const smutex& _sm;
- bool _locked;
- public:
- inline stlock(const smutex& sm) : _sm(sm), _locked(false)
- {
- int ret = ::pthread_mutex_trylock(_sm.get());
- _locked = (ret == 0); // check if lock obtained
- if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
- }
- inline ~stlock()
- {
- if (_locked)
- PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
- }
- inline bool locked() const { return _locked; }
- };
+ if (_locked)
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
+ }
+ inline bool locked() const { return _locked; }
+};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
index 6d84c6c451..5e0a28814d 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
@@ -21,11 +21,7 @@
#include "qpid/linearstore/journal/txn_map.h"
-#include <iomanip>
-#include "qpid/linearstore/journal/jerrno.h"
-#include "qpid/linearstore/journal/jexception.h"
#include "qpid/linearstore/journal/slock.h"
-#include <sstream>
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
index ef01e1df92..0f8cd5f3d7 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
@@ -22,16 +22,8 @@
#ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
#define QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
- class txn_map;
-}}}
-
#include "qpid/linearstore/journal/smutex.h"
#include <map>
-#include <pthread.h>
-#include <string>
#include <vector>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index ed4853ba1b..37448f2a8d 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -23,7 +23,6 @@
#include <cassert>
#include <cstring>
-#include <iomanip>
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index 9af2764ab5..e308f4ab06 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -22,16 +22,12 @@
#include "qpid/linearstore/journal/wmgr.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
-#include <cstring>
-#include "qpid/linearstore/journal/utils/file_hdr.h"
-#include "qpid/linearstore/journal/jcfg.h"
+#include "qpid/linearstore/journal/aio_callback.h"
+#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/jcntl.h"
-#include "qpid/linearstore/journal/jerrno.h"
#include "qpid/linearstore/journal/JournalFile.h"
-#include <sstream>
-#include <stdint.h>
+#include "qpid/linearstore/journal/LinearFileController.h"
+#include "qpid/linearstore/journal/utils/file_hdr.h"
//#include <iostream> // DEBUG
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
index c55c0db577..0aa21ca545 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
@@ -22,20 +22,10 @@
#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
#define QPID_LINEARSTORE_JOURNAL_WMGR_H
-namespace qpid {
-namespace linearstore {
-namespace journal {
-class wmgr;
-}}}
-
-#include <cstring>
-#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/journal/enums.h"
#include "qpid/linearstore/journal/pmgr.h"
#include <set>
-class file_hdr_t;
-
namespace qpid {
namespace linearstore {
namespace journal {