diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-11-18 17:44:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-11-18 17:44:32 +0000 |
| commit | 0daaf7f017473712dc5fd0cb96a59b8c5b3f36b2 (patch) | |
| tree | d36d71488550abbc93b11e55af6dae2040478285 /qpid/cpp/src | |
| parent | 7995a381a48d623d245fc9fcd9a1b6996999d27a (diff) | |
| download | qpid-python-0daaf7f017473712dc5fd0cb96a59b8c5b3f36b2.tar.gz | |
QPID-4984: Cleanup of #includes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1543093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
48 files changed, 810 insertions, 1018 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index 4092bc4ab9..081fab190d 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -21,25 +21,14 @@ #include "qpid/linearstore/JournalImpl.h" +#include "qpid/linearstore/DataTokenImpl.h" #include "qpid/linearstore/JournalLogImpl.h" -#include "qpid/linearstore/journal/jerrno.h" #include "qpid/linearstore/journal/jexception.h" -#include "qpid/linearstore/journal/EmptyFilePool.h" #include "qpid/linearstore/StoreException.h" -#include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Timer.h" -#include "qmf/org/apache/qpid/linearstore/EventCreated.h" -#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h" -#include "qmf/org/apache/qpid/linearstore/EventFull.h" -#include "qmf/org/apache/qpid/linearstore/EventRecovered.h" - -using namespace qpid::linearstore::journal; -using namespace qpid::linearstore; -using qpid::management::ManagementAgent; -namespace _qmf = qmf::org::apache::qpid::linearstore; +namespace qpid { +namespace linearstore { InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} @@ -87,7 +76,7 @@ JournalImpl::~JournalImpl() if (deleteCallback) deleteCallback(*this); if (_init_flag && !_stop_flag){ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! - catch (const jexception& e) { QLS_LOG2(error, _jid, e.what()); } + catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); } } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); @@ -106,8 +95,8 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) _agent = a; if (_agent != 0) { - _mgmtObject = _qmf::Journal::shared_ptr ( - new _qmf::Journal(_agent, this)); + _mgmtObject = ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr ( + new ::qmf::org::apache::qpid::linearstore::Journal(_agent, this)); _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); @@ -163,11 +152,7 @@ JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_, } void -JournalImpl::recover(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ - boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, +JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::linearstore::journal::aio_callback* const cbp, @@ -211,8 +196,8 @@ JournalImpl::recover(/*const uint16_t num_jfiles, if (prep_tx_list_ptr) { for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { - txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found - for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { + qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found + for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { if (tdl_itr->enq_flag_) { // enqueue op i->enqueues->add(queue_id, tdl_itr->rid_); } else { // dequeue op @@ -253,7 +238,7 @@ JournalImpl::recover_complete() void JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, data_tok* dtokp, const bool transient) + const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient) { handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); @@ -265,7 +250,7 @@ JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_d } void -JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, +JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient) { handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); @@ -279,7 +264,7 @@ JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dto void JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient) + const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; @@ -296,7 +281,7 @@ JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t t } void -JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp, +JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; @@ -314,7 +299,7 @@ JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* } void -JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit) +JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit) { handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); @@ -327,7 +312,7 @@ JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_comm } void -JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) +JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; @@ -344,7 +329,7 @@ JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& x } void -JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) +JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_abort(dtokp, xid)); @@ -356,7 +341,7 @@ JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) } void -JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) +JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_commit(dtokp, xid)); @@ -381,10 +366,10 @@ JournalImpl::stop(bool block_till_aio_cmpl) } } -iores +qpid::linearstore::journal::iores JournalImpl::flush(const bool block_till_aio_cmpl) { - const iores res = jcntl::flush(block_till_aio_cmpl); + const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl); { qpid::sys::Mutex::ScopedLock sl(_getf_lock); if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); } @@ -420,20 +405,20 @@ JournalImpl::flushFire() } void -JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl) +JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl) { - for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) + for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) { DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i); if (/*!is_stopped() &&*/ dtokp->getSourceMessage()) { switch (dtokp->wstate()) { - case data_tok::ENQ: + case qpid::linearstore::journal::data_tok::ENQ: //std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG dtokp->getSourceMessage()->enqueueComplete(); break; - case data_tok::DEQ: + case qpid::linearstore::journal::data_tok::DEQ: //std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG /* Don't need to signal until we have a way to ack completion of dequeue in AMQP dtokp->getSourceMessage()->dequeueComplete(); @@ -458,7 +443,7 @@ JournalImpl::createStore() { } void -JournalImpl::handleIoResult(const iores r) +JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r) { writeActivityFlag = true; switch (r) @@ -496,3 +481,5 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t / return status; } + +}} diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index ec1fb99434..763089f3d0 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -22,33 +22,27 @@ #ifndef QPID_LINEARSTORE_JOURNALIMPL_H #define QPID_LINEARSTORE_JOURNALIMPL_H -#include <set> -#include "qpid/linearstore/journal/enums.h" -#include "qpid/linearstore/journal/EmptyFilePoolTypes.h" +#include <boost/ptr_container/ptr_list.hpp> +#include "qpid/broker/PersistableQueue.h" +#include "qpid/linearstore/journal/aio_callback.h" #include "qpid/linearstore/journal/jcntl.h" -#include "qpid/linearstore/DataTokenImpl.h" #include "qpid/linearstore/PreparedTransaction.h" -#include "qpid/broker/PersistableQueue.h" #include "qpid/sys/Timer.h" -#include "qpid/sys/Time.h" -#include <boost/ptr_container/ptr_list.hpp> -#include <boost/intrusive_ptr.hpp> -#include "qpid/management/Manageable.h" + #include "qmf/org/apache/qpid/linearstore/Journal.h" namespace qpid{ namespace sys { -class Timer; +//class Timer; } namespace linearstore{ - -class JournalImpl; -class JournalLogImpl; namespace journal { - class EmptyFilePool; +// class EmptyFilePool; } +class JournalImpl; +class JournalLogImpl; class InactivityFireEvent : public qpid::sys::TimerTask { diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp index 85e977595d..36e7c7e410 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp @@ -21,6 +21,8 @@ #include "qpid/linearstore/JournalLogImpl.h" +#include "qpid/log/Statement.h" + namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h index e674c6dcc8..846eaac124 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h @@ -23,7 +23,6 @@ #define QPID_LINEARSTORE_LOG_H #include "qpid/linearstore/journal/JournalLog.h" -#include "qpid/log/Statement.h" #define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg) #define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg) diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index dcb53ff55d..593d2bd4b7 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -21,28 +21,29 @@ #include "qpid/linearstore/MessageStoreImpl.h" -#include "db-inc.h" -#include "qpid/broker/QueueSettings.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/FieldValue.h" #include "qpid/linearstore/BindingDbt.h" #include "qpid/linearstore/BufferValue.h" +#include "qpid/linearstore/Cursor.h" +#include "qpid/linearstore/DataTokenImpl.h" #include "qpid/linearstore/IdDbt.h" +#include "qpid/linearstore/JournalImpl.h" #include "qpid/linearstore/journal/EmptyFilePoolManager.h" -#include "qpid/linearstore/journal/txn_map.h" -#include "qpid/framing/FieldValue.h" -#include "qmf/org/apache/qpid/linearstore/Package.h" #include "qpid/linearstore/StoreException.h" -#include <dirent.h> +#include "qpid/linearstore/TxnCtxt.h" +#include "qpid/log/Statement.h" +#include "qmf/org/apache/qpid/linearstore/Package.h" #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec #define AIO_SLEEP_TIME_US 10 // 0.01 ms -namespace _qmf = qmf::org::apache::qpid::linearstore; +//namespace _qmf = qmf::org::apache::qpid::linearstore; namespace qpid{ namespace linearstore{ - const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name // FIXME aconway 2010-03-09: was 10 @@ -147,9 +148,9 @@ void MessageStoreImpl::initManagement () if (broker != 0) { agent = broker->getManagementAgent(); if (agent != 0) { - _qmf::Package packageInitializer(agent); - mgmtObject = _qmf::Store::shared_ptr ( - new _qmf::Store(agent, this, broker)); + qmf::org::apache::qpid::linearstore::Package packageInitializer(agent); + mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr ( + new qmf::org::apache::qpid::linearstore::Store(agent, this, broker)); mgmtObject->set_location(storeDir); mgmtObject->set_tplIsInitialized(false); diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 6992c17bef..e995237d99 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -23,22 +23,14 @@ #define QPID_LINEARSTORE_MESSAGESTOREIMPL_H #include <iomanip> -#include <string> - -#include "db-inc.h" -#include "qpid/linearstore/Cursor.h" -#include "qpid/linearstore/IdDbt.h" +#include "qpid/broker/MessageStore.h" #include "qpid/linearstore/IdSequence.h" -#include "qpid/linearstore/JournalImpl.h" #include "qpid/linearstore/JournalLogImpl.h" #include "qpid/linearstore/journal/jcfg.h" #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/PreparedTransaction.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/MessageStore.h" -#include "qpid/management/Manageable.h" + #include "qmf/org/apache/qpid/linearstore/Store.h" -#include "qpid/linearstore/TxnCtxt.h" // Assume DB_VERSION_MAJOR == 4 #if (DB_VERSION_MINOR == 2) @@ -46,15 +38,28 @@ #define DB_BUFFER_SMALL ENOMEM #endif -namespace qpid { namespace sys { -class Timer; -}} +class Db; +class DbEnv; +class Dbt; +class DbTxn; -namespace qpid{ -namespace qls_jrnl { -class EmptyFilePoolManager; +namespace qpid { +namespace broker { + class Broker; +} +namespace sys { + class Timer; } namespace linearstore{ +namespace journal { + class EmptyFilePool; + class EmptyFilePoolManager; +} + +class IdDbt; +class JournalImpl; +class TplJournalImpl; +class TxnCtxt; /** * An implementation of the MessageStore interface based on Berkeley DB diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp index a353a339e3..1b92ca8c23 100644 --- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp +++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp @@ -20,10 +20,9 @@ */ #include "qpid/linearstore/PreparedTransaction.h" -#include <algorithm> -using namespace qpid::linearstore; -using std::string; +namespace qpid { +namespace linearstore { void LockedMappings::add(queue_id queue, message_id message) { @@ -79,3 +78,4 @@ PreparedTransaction::PreparedTransaction(const std::string& _xid, : xid(_xid), enqueues(_enqueues), dequeues(_dequeues) {} +}} diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h index 9e401c2a30..7b381ba3b9 100644 --- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h +++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h @@ -22,13 +22,11 @@ #ifndef QPID_LINEARSTORE_PREPAREDTRANSACTION_H #define QPID_LINEARSTORE_PREPAREDTRANSACTION_H +#include <boost/ptr_container/ptr_list.hpp> +#include <boost/shared_ptr.hpp> #include <list> #include <map> -#include <set> #include <stdint.h> -#include <string> -#include <boost/shared_ptr.hpp> -#include <boost/ptr_container/ptr_list.hpp> namespace qpid{ namespace linearstore{ diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h index 5d577d5779..7a598a524f 100644 --- a/qpid/cpp/src/qpid/linearstore/StoreException.h +++ b/qpid/cpp/src/qpid/linearstore/StoreException.h @@ -22,8 +22,8 @@ #ifndef QPID_LINEARSTORE_STOREEXCEPTION_H #define QPID_LINEARSTORE_STOREEXCEPTION_H -#include "qpid/linearstore/IdDbt.h" #include <boost/format.hpp> +#include "db-inc.h" namespace qpid{ namespace linearstore{ diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index 9826240068..6dfb2056bf 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -25,6 +25,7 @@ #include "qpid/DataDir.h" #include "qpid/linearstore/JournalLogImpl.h" #include "qpid/linearstore/MessageStoreImpl.h" +#include "qpid/log/Statement.h" using qpid::linearstore::MessageStoreImpl; diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 213d5865e1..50ef58aafa 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -21,9 +21,8 @@ #include "qpid/linearstore/TxnCtxt.h" -#include <sstream> - -#include "qpid/linearstore/journal/jexception.h" +#include "qpid/linearstore/DataTokenImpl.h" +#include "qpid/linearstore/JournalImpl.h" #include "qpid/linearstore/StoreException.h" namespace qpid{ diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h index 5a4350b128..4f95da5950 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h @@ -22,23 +22,21 @@ #ifndef QPID_LINEARSTORE_TXNCTXT_H #define QPID_LINEARSTORE_TXNCTXT_H -#include "db-inc.h" -#include <memory> -#include <set> -#include <string> - -#include "qpid/linearstore/DataTokenImpl.h" -#include "qpid/linearstore/IdSequence.h" -#include "qpid/linearstore/JournalImpl.h" -#include "qpid/broker/PersistableQueue.h" +#include <boost/intrusive_ptr.hpp> #include "qpid/broker/TransactionalStore.h" -#include "qpid/sys/Mutex.h" +#include "qpid/linearstore/IdSequence.h" #include "qpid/sys/uuid.h" -#include <boost/intrusive_ptr.hpp> +class DbEnv; +class DbTxn; -namespace qpid{ +namespace qpid { +namespace broker { + class ExternalQueueStore; +} namespace linearstore{ + class DataTokenImpl; + class JournalImpl; class TxnCtxt : public qpid::broker::TransactionContext { diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp index 29f8b0b988..83db81dce8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp @@ -21,12 +21,10 @@ #include "EmptyFilePool.h" -#include <cctype> #include <fstream> #include "qpid/linearstore/journal/EmptyFilePoolPartition.h" #include "qpid/linearstore/journal/jcfg.h" #include "qpid/linearstore/journal/jdir.h" -#include "qpid/linearstore/journal/JournalFile.h" #include "qpid/linearstore/journal/JournalLog.h" #include "qpid/linearstore/journal/slock.h" #include "qpid/linearstore/journal/utils/file_hdr.h" diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h index ccaeb02567..5751c9d180 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h @@ -31,7 +31,6 @@ namespace journal { #include <deque> #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/journal/smutex.h" -#include <string> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp index 2aadb90a8a..ab70e3825e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp @@ -21,12 +21,11 @@ #include "EmptyFilePoolManager.h" -#include <dirent.h> +#include "qpid/linearstore/journal/EmptyFilePool.h" #include "qpid/linearstore/journal/EmptyFilePoolPartition.h" #include "qpid/linearstore/journal/jdir.h" #include "qpid/linearstore/journal/JournalLog.h" #include "qpid/linearstore/journal/slock.h" -#include <vector> //#include <iostream> // DEBUG diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h index beb884b91d..fa667f3326 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h @@ -23,13 +23,18 @@ #define QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ #include <map> -#include "qpid/linearstore/journal/EmptyFilePoolPartition.h" +#include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/journal/smutex.h" +#include <vector> namespace qpid { namespace linearstore { namespace journal { +class EmptyFilePool; +class EmptyFilePoolPartition; +class JournalLog; + class EmptyFilePoolManager { protected: diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index bcebd571c2..bef8d4e788 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -21,11 +21,9 @@ #include "qpid/linearstore/journal/EmptyFilePoolPartition.h" -#include <dirent.h> #include <iomanip> +#include "qpid/linearstore/journal/EmptyFilePool.h" #include "qpid/linearstore/journal/jdir.h" -#include "qpid/linearstore/journal/jerrno.h" -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/slock.h" //#include <iostream> // DEBUG diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h index 3afee79816..bd338461cb 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h @@ -22,22 +22,17 @@ #ifndef QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ #define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLPARTITION_H_ -namespace qpid { -namespace linearstore { -namespace journal { - class EmptyFilePoolPartition; -}}} - -#include "qpid/linearstore/journal/EmptyFilePool.h" +#include <map> #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/journal/smutex.h" #include <string> -#include <map> #include <vector> namespace qpid { namespace linearstore { namespace journal { + +class EmptyFilePool; class JournalLog; class EmptyFilePoolPartition diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h index d8e8225697..14213f7955 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h @@ -24,7 +24,6 @@ #include <iostream> #include <stdint.h> -#include <utility> // std::pair namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp index 15cbe33ad8..8f6a311a84 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp @@ -23,7 +23,6 @@ #include <fcntl.h> #include "qpid/linearstore/journal/jcfg.h" -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/pmgr.h" #include "qpid/linearstore/journal/utils/file_hdr.h" #include <unistd.h> diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h index ef97d10e3a..f0ad432fd8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h +++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h @@ -25,8 +25,6 @@ #include "qpid/linearstore/journal/aio.h" #include "qpid/linearstore/journal/AtomicCounter.h" #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" -#include <stdint.h> -#include <string> class file_hdr_t; diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp index 9a2438ae0b..c35ec97e91 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/JournalLog.cpp @@ -20,6 +20,7 @@ */ #include "qpid/linearstore/journal/JournalLog.h" + #include <iostream> namespace qpid { diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index 99883d2722..99b7f0f381 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -21,13 +21,9 @@ #include "qpid/linearstore/journal/LinearFileController.h" -#include <fstream> #include "qpid/linearstore/journal/EmptyFilePool.h" -#include "qpid/linearstore/journal/jcfg.h" #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" -#include "qpid/linearstore/journal/slock.h" -#include "qpid/linearstore/journal/utils/file_hdr.h" //#include <iostream> // DEBUG diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h index 436c07f289..61226d3015 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h @@ -23,13 +23,10 @@ #define QPID_LINEARSTORE_JOURNAL_LINEARFILECONTROLLER_H_ #include <deque> +#include "qpid/linearstore/journal/aio.h" #include "qpid/linearstore/journal/AtomicCounter.h" #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" -// libaio forward declares -typedef struct io_context* io_context_t; -typedef struct iocb aio_cb; - namespace qpid { namespace linearstore { namespace journal { diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 8ae82c7b80..22241aa164 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -26,6 +26,7 @@ #include <iomanip> #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/deq_rec.h" +#include "qpid/linearstore/journal/EmptyFilePool.h" #include "qpid/linearstore/journal/EmptyFilePoolManager.h" #include "qpid/linearstore/journal/enq_map.h" #include "qpid/linearstore/journal/enq_rec.h" diff --git a/qpid/cpp/src/qpid/linearstore/journal/aio.h b/qpid/cpp/src/qpid/linearstore/journal/aio.h index 3a4a762439..54e3401748 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/aio.h +++ b/qpid/cpp/src/qpid/linearstore/journal/aio.h @@ -24,7 +24,6 @@ #include <libaio.h> #include <cstring> -#include <string.h> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/cvar.h b/qpid/cpp/src/qpid/linearstore/journal/cvar.h deleted file mode 100644 index 89ba8cbb13..0000000000 --- a/qpid/cpp/src/qpid/linearstore/journal/cvar.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H -#define QPID_LINEARSTORE_JOURNAL_CVAR_H - -#include <cstring> -#include "qpid/linearstore/journal/jerrno.h" -#include "qpid/linearstore/journal/jexception.h" -#include "qpid/linearstore/journal/smutex.h" -#include "qpid/linearstore/journal/time_ns.h" -#include <pthread.h> -#include <sstream> - -namespace qpid { -namespace linearstore { -namespace journal { - - // Ultra-simple thread condition variable class - class cvar - { - private: - const smutex& _sm; - pthread_cond_t _c; - public: - inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); } - inline ~cvar() { ::pthread_cond_destroy(&_c); } - inline void wait() - { - PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait"); - } - inline void timedwait(timespec& ts) - { - PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait"); - } - inline bool waitintvl(const long intvl_ns) - { - time_ns t; t.now(); t+=intvl_ns; - int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t); - if (ret == ETIMEDOUT) - return true; - PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl"); - return false; - } - inline void signal() - { - PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify"); - } - inline void broadcast() - { - PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast"); - } - }; - -}}} - -#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H diff --git a/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp b/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp index 830720f9d7..3952c403a1 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/data_tok.cpp @@ -22,10 +22,7 @@ #include "qpid/linearstore/journal/data_tok.h" #include <iomanip> -#include "qpid/linearstore/journal/jerrno.h" -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/slock.h" -#include <sstream> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/data_tok.h b/qpid/cpp/src/qpid/linearstore/journal/data_tok.h index ff854730d2..67e0ec9683 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/data_tok.h +++ b/qpid/cpp/src/qpid/linearstore/journal/data_tok.h @@ -29,10 +29,7 @@ class data_tok; }}} #include <cassert> -#include <cstddef> #include "qpid/linearstore/journal/smutex.h" -#include <pthread.h> -#include <string> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp index 0a9fee20f2..0da8d439af 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -23,7 +23,6 @@ #include <cassert> #include <cstring> -#include <iomanip> #include "qpid/linearstore/journal/jexception.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp index 2ccd271865..4eaaa64992 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_map.cpp @@ -21,11 +21,7 @@ #include "qpid/linearstore/journal/enq_map.h" -#include <iomanip> -#include "qpid/linearstore/journal/jerrno.h" #include "qpid/linearstore/journal/slock.h" -#include <sstream> - namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_map.h b/qpid/cpp/src/qpid/linearstore/journal/enq_map.h index 97bad07ec6..912a583ab9 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_map.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_map.h @@ -22,88 +22,79 @@ #ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H #define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H -namespace qpid { -namespace linearstore { -namespace journal { -class enq_map; -}}} - -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/smutex.h" -#include <map> -#include <pthread.h> #include <vector> namespace qpid { namespace linearstore { namespace journal { - /** - * \class enq_map - * \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued - * data block using the record id (rid) as a key. This is the primary mechanism for - * deterimining the enqueue low water mark: if a pfid exists in this map, then there is - * at least one still-enqueued record in that file. (The transaction map must also be - * clear, however.) - * - * Map rids against pfid and lock status. As records are enqueued, they are added to this - * map, and as they are dequeued, they are removed. An enqueue is locked when a transactional - * dequeue is pending that has been neither committed nor aborted. - * <pre> - * key data - * - * rid1 --- [ pfid, txn_lock ] - * rid2 --- [ pfid, txn_lock ] - * rid3 --- [ pfid, txn_lock ] - * ... - * </pre> - */ - class enq_map - { - public: - // return/error codes - static short EMAP_DUP_RID; - static short EMAP_LOCKED; - static short EMAP_RID_NOT_FOUND; - static short EMAP_OK; - static short EMAP_FALSE; - static short EMAP_TRUE; +/** +* \class enq_map +* \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued +* data block using the record id (rid) as a key. This is the primary mechanism for +* deterimining the enqueue low water mark: if a pfid exists in this map, then there is +* at least one still-enqueued record in that file. (The transaction map must also be +* clear, however.) +* +* Map rids against pfid and lock status. As records are enqueued, they are added to this +* map, and as they are dequeued, they are removed. An enqueue is locked when a transactional +* dequeue is pending that has been neither committed nor aborted. +* <pre> +* key data +* +* rid1 --- [ pfid, txn_lock ] +* rid2 --- [ pfid, txn_lock ] +* rid3 --- [ pfid, txn_lock ] +* ... +* </pre> +*/ +class enq_map +{ +public: + // return/error codes + static short EMAP_DUP_RID; + static short EMAP_LOCKED; + static short EMAP_RID_NOT_FOUND; + static short EMAP_OK; + static short EMAP_FALSE; + static short EMAP_TRUE; - typedef struct emap_data_struct_t { - uint64_t _pfid; - std::streampos _file_posn; - bool _lock; - emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {} - emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {} - } emqp_data_struct_t; - typedef std::pair<uint64_t, emap_data_struct_t> emap_param; - typedef std::map<uint64_t, emap_data_struct_t> emap; - typedef emap::iterator emap_itr; + typedef struct emap_data_struct_t { + uint64_t _pfid; + std::streampos _file_posn; + bool _lock; + emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {} + emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {} + } emqp_data_struct_t; + typedef std::pair<uint64_t, emap_data_struct_t> emap_param; + typedef std::map<uint64_t, emap_data_struct_t> emap; + typedef emap::iterator emap_itr; - private: - emap _map; - smutex _mutex; +private: + emap _map; + smutex _mutex; - public: - enq_map(); - virtual ~enq_map(); +public: + enq_map(); + virtual ~enq_map(); - short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid; - short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid; - short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked - short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked - short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked - short get_data(const uint64_t rid, emap_data_struct_t& eds); - bool is_enqueued(const uint64_t rid, bool ignore_lock = false); - short lock(const uint64_t rid); // 0=ok; -1=rid not found - short unlock(const uint64_t rid); // 0=ok; -1=rid not found - short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found - inline void clear() { _map.clear(); } - inline bool empty() const { return _map.empty(); } - inline uint32_t size() const { return uint32_t(_map.size()); } - void rid_list(std::vector<uint64_t>& rv); - void pfid_list(std::vector<uint64_t>& fv); - }; + short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid; + short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid; + short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked + short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked + short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked + short get_data(const uint64_t rid, emap_data_struct_t& eds); + bool is_enqueued(const uint64_t rid, bool ignore_lock = false); + short lock(const uint64_t rid); // 0=ok; -1=rid not found + short unlock(const uint64_t rid); // 0=ok; -1=rid not found + short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found + inline void clear() { _map.clear(); } + inline bool empty() const { return _map.empty(); } + inline uint32_t size() const { return uint32_t(_map.size()); } + void rid_list(std::vector<uint64_t>& rv); + void pfid_list(std::vector<uint64_t>& fv); +}; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp index f75fc2228d..9dcb2d616e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -23,7 +23,6 @@ #include <cassert> #include <cstring> -#include <iomanip> #include "qpid/linearstore/journal/jexception.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/linearstore/journal/enums.h b/qpid/cpp/src/qpid/linearstore/journal/enums.h index 106f58cf5f..90ec355955 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enums.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enums.h @@ -26,32 +26,32 @@ namespace qpid { namespace linearstore { namespace journal { - // TODO: Change this to flags, as multiple of these conditions may exist simultaneously - /** - * \brief Enumeration of possible return states from journal read and write operations. - */ - enum _iores - { - RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly. - RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO. - RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO. - RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read. - RHM_IORES_TXPENDING ///< Operation blocked by pending transaction. - }; - typedef _iores iores; +// TODO: Change this to flags, as multiple of these conditions may exist simultaneously +/** +* \brief Enumeration of possible return states from journal read and write operations. +*/ +enum _iores +{ + RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly. + RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO. + RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO. + RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read. + RHM_IORES_TXPENDING ///< Operation blocked by pending transaction. +}; +typedef _iores iores; - static inline const char* iores_str(iores res) +static inline const char* iores_str(iores res) +{ + switch (res) { - switch (res) - { - case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS"; - case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT"; - case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT"; - case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY"; - case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING"; - } - return "<iores unknown>"; + case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS"; + case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT"; + case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT"; + case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY"; + case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING"; } + return "<iores unknown>"; +} }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp index c36a38d1d2..942feed824 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp @@ -21,22 +21,8 @@ #include "qpid/linearstore/journal/jcntl.h" -#include <algorithm> -#include <cassert> -#include <cerrno> -#include <cstdlib> -#include <cstring> -#include <fstream> -#include <iomanip> -#include <iostream> -#include <qpid/linearstore/journal/EmptyFilePool.h> -#include <qpid/linearstore/journal/EmptyFilePoolManager.h> -#include "qpid/linearstore/journal/jerrno.h" +#include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/JournalLog.h" -#include "qpid/linearstore/journal/utils/enq_hdr.h" -#include <limits> -#include <sstream> -#include <unistd.h> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h index 612b510cb3..c12c8afbc9 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h @@ -22,551 +22,543 @@ #ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H #define QPID_LINEARSTORE_JOURNAL_JCNTL_H -namespace qpid { -namespace linearstore { -namespace journal { - class jcntl; -}}} - -#include <cstddef> -#include <deque> #include <qpid/linearstore/journal/LinearFileController.h> #include "qpid/linearstore/journal/jdir.h" #include "qpid/linearstore/journal/RecoveryManager.h" -#include "qpid/linearstore/journal/slock.h" -#include "qpid/linearstore/journal/smutex.h" #include "qpid/linearstore/journal/wmgr.h" namespace qpid { namespace linearstore { namespace journal { - class EmptyFilePool; - class EmptyFilePoolManager; + +class EmptyFilePool; +class EmptyFilePoolManager; +class JournalLog; + +/** +* \brief Access and control interface for the journal. This is the top-level class for the +* journal. +* +* This is the top-level journal class; one instance of this class controls one instance of the +* journal and all its files and associated control structures. Besides this class, the only +* other class that needs to be used at a higher level is the data_tok class, one instance of +* which is used per data block written to the journal, and is used to track its status through +* the AIO enqueue, read and dequeue process. +*/ +class jcntl +{ +protected: + /** + * \brief Journal ID + * + * This string uniquely identifies this journal instance. It will most likely be associated + * with the identity of the message queue with which it is associated. + */ + // TODO: This is not included in any files at present, add to file_hdr? + std::string _jid; + + /** + * \brief Journal directory + * + * This string stores the path to the journal directory. It may be absolute or relative, and + * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct, + * "/fastdisk/jdata/" is not.) + */ + jdir _jdir; + + /** + * \brief Initialized flag + * + * This flag starts out set to false, is set to true once this object has been initialized, + * either by calling initialize() or recover(). + */ + bool _init_flag; + + /** + * \brief Stopped flag + * + * This flag starts out false, and is set to true when stop() is called. At this point, the + * journal will no longer accept messages until either initialize() or recover() is called. + * There is no way other than through initialization to reset this flag. + */ + // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called, + // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During + // this period, however, no new enqueue/dequeue/read requests may be accepted. + bool _stop_flag; + + /** + * \brief Read-only state flag used during recover. + * + * When true, this flag prevents journal write operations (enqueue and dequeue), but + * allows read to occur. It is used during recovery, and is reset when recovered() is + * called. + */ + bool _readonly_flag; + + // Journal control structures + JournalLog& _jrnl_log; ///< Ref to Journal Log instance + LinearFileController _linearFileController; ///< Linear File Controller + EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue + enq_map _emap; ///< Enqueue map for low water mark management + txn_map _tmap; ///< Transaction map open transactions + wmgr _wmgr; ///< Write page manager which manages AIO + RecoveryManager _recoveryManager; ///< Recovery data used for recovery + smutex _wr_mutex; ///< Mutex for journal writes + +public: + static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns + static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing + + /** + * \brief Journal constructor. + * + * Constructor which sets the physical file location and base name. + * + * \param jid A unique identifier for this journal instance. + * \param jdir The directory which will contain the journal files. + * \param base_filename The string which will be used to start all journal filenames. + */ + jcntl(const std::string& jid, + const std::string& jdir, + JournalLog& jrnl_log); + + /** + * \brief Destructor. + */ + virtual ~jcntl(); + + inline const std::string& id() const { return _jid; } + + inline const std::string& jrnl_dir() const { return _jdir.dirname(); } + + /** + * \brief Initialize the journal for storing data. + * + * Initialize the journal by creating new journal data files and initializing internal + * control structures. When complete, the journal will be empty, and ready to store data. + * + * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover + * the data from an existing journal, use recover(). + * + * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created + * and deleted.</b> + * + * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be + * used.</b> + * + * \param num_jfiles The number of journal files to be created. + * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically + * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then + * no files are added and an exception will be thrown if the journal runs out of file space. + * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the + * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If + * this number of files exist and the journal runs out of space, an exception will be thrown. This number + * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a + * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. + * \param jfsize_sblks The size of each journal file expressed in softblocks. + * \param wcache_num_pages The number of write cache pages to create. + * \param wcache_pgsize_sblks The size in sblks of each write cache page. + * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). + * + * \exception TODO + */ + void initialize(EmptyFilePool* efpp, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp); /** - * \brief Access and control interface for the journal. This is the top-level class for the - * journal. - * - * This is the top-level journal class; one instance of this class controls one instance of the - * journal and all its files and associated control structures. Besides this class, the only - * other class that needs to be used at a higher level is the data_tok class, one instance of - * which is used per data block written to the journal, and is used to track its status through - * the AIO enqueue, read and dequeue process. + * /brief Initialize journal by recovering state from previously written journal. + * + * Initialize journal by recovering state from previously written journal. The journal files + * are analyzed, and all records that have not been dequeued and that remain in the journal + * will be available for reading. The journal is placed in a read-only state until + * recovered() is called; any calls to enqueue or dequeue will fail with an exception + * in this state. + * + * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created + * and deleted.</b> + * + * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be + * used.</b> + * + * \param num_jfiles The number of journal files to be created. + * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically + * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then + * no files are added and an exception will be thrown if the journal runs out of file space. + * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the + * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If + * this number of files exist and the journal runs out of space, an exception will be thrown. This number + * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a + * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. + * \param jfsize_sblks The size of each journal file expressed in softblocks. + * \param wcache_num_pages The number of write cache pages to create. + * \param wcache_pgsize_sblks The size in sblks of each write cache page. + * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). + * \param prep_txn_list_ptr + * \param highest_rid Returns the highest rid found in the journal during recover + * + * \exception TODO */ - class jcntl - { - protected: - /** - * \brief Journal ID - * - * This string uniquely identifies this journal instance. It will most likely be associated - * with the identity of the message queue with which it is associated. - */ - // TODO: This is not included in any files at present, add to file_hdr? - std::string _jid; - - /** - * \brief Journal directory - * - * This string stores the path to the journal directory. It may be absolute or relative, and - * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct, - * "/fastdisk/jdata/" is not.) - */ - jdir _jdir; - - /** - * \brief Initialized flag - * - * This flag starts out set to false, is set to true once this object has been initialized, - * either by calling initialize() or recover(). - */ - bool _init_flag; - - /** - * \brief Stopped flag - * - * This flag starts out false, and is set to true when stop() is called. At this point, the - * journal will no longer accept messages until either initialize() or recover() is called. - * There is no way other than through initialization to reset this flag. - */ - // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called, - // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During - // this period, however, no new enqueue/dequeue/read requests may be accepted. - bool _stop_flag; - - /** - * \brief Read-only state flag used during recover. - * - * When true, this flag prevents journal write operations (enqueue and dequeue), but - * allows read to occur. It is used during recovery, and is reset when recovered() is - * called. - */ - bool _readonly_flag; - - // Journal control structures - JournalLog& _jrnl_log; ///< Ref to Journal Log instance - LinearFileController _linearFileController; ///< Linear File Controller - EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue - enq_map _emap; ///< Enqueue map for low water mark management - txn_map _tmap; ///< Transaction map open transactions - wmgr _wmgr; ///< Write page manager which manages AIO - RecoveryManager _recoveryManager; ///< Recovery data used for recovery - smutex _wr_mutex; ///< Mutex for journal writes - - public: - static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns - static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing - - /** - * \brief Journal constructor. - * - * Constructor which sets the physical file location and base name. - * - * \param jid A unique identifier for this journal instance. - * \param jdir The directory which will contain the journal files. - * \param base_filename The string which will be used to start all journal filenames. - */ - jcntl(const std::string& jid, - const std::string& jdir, - JournalLog& jrnl_log); - - /** - * \brief Destructor. - */ - virtual ~jcntl(); - - inline const std::string& id() const { return _jid; } - - inline const std::string& jrnl_dir() const { return _jdir.dirname(); } - - /** - * \brief Initialize the journal for storing data. - * - * Initialize the journal by creating new journal data files and initializing internal - * control structures. When complete, the journal will be empty, and ready to store data. - * - * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover - * the data from an existing journal, use recover(). - * - * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created - * and deleted.</b> - * - * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be - * used.</b> - * - * \param num_jfiles The number of journal files to be created. - * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically - * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then - * no files are added and an exception will be thrown if the journal runs out of file space. - * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the - * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If - * this number of files exist and the journal runs out of space, an exception will be thrown. This number - * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a - * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. - * \param jfsize_sblks The size of each journal file expressed in softblocks. - * \param wcache_num_pages The number of write cache pages to create. - * \param wcache_pgsize_sblks The size in sblks of each write cache page. - * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). - * - * \exception TODO - */ - void initialize(EmptyFilePool* efpp, - const uint16_t wcache_num_pages, - const uint32_t wcache_pgsize_sblks, - aio_callback* const cbp); - - /** - * /brief Initialize journal by recovering state from previously written journal. - * - * Initialize journal by recovering state from previously written journal. The journal files - * are analyzed, and all records that have not been dequeued and that remain in the journal - * will be available for reading. The journal is placed in a read-only state until - * recovered() is called; any calls to enqueue or dequeue will fail with an exception - * in this state. - * - * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created - * and deleted.</b> - * - * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be - * used.</b> - * - * \param num_jfiles The number of journal files to be created. - * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically - * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then - * no files are added and an exception will be thrown if the journal runs out of file space. - * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the - * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If - * this number of files exist and the journal runs out of space, an exception will be thrown. This number - * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a - * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. - * \param jfsize_sblks The size of each journal file expressed in softblocks. - * \param wcache_num_pages The number of write cache pages to create. - * \param wcache_pgsize_sblks The size in sblks of each write cache page. - * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). - * \param prep_txn_list_ptr - * \param highest_rid Returns the highest rid found in the journal during recover - * - * \exception TODO - */ - void recover(EmptyFilePoolManager* efpm, - const uint16_t wcache_num_pages, - const uint32_t wcache_pgsize_sblks, - aio_callback* const cbp, - const std::vector<std::string>* prep_txn_list_ptr, - uint64_t& highest_rid); - - /** - * \brief Notification to the journal that recovery is complete and that normal operation - * may resume. - * - * This call notifies the journal that recovery is complete and that normal operation - * may resume. The read pointers are reset so that all records read as a part of recover - * may be re-read during normal operation. The read-only flag is then reset, allowing - * enqueue and dequeue operations to resume. - * - * \exception TODO - */ - void recover_complete(); - - /** - * \brief Stops journal and deletes all journal files. - * - * Clear the journal directory of all journal files matching the base filename. - * - * \exception TODO - */ - void delete_jrnl_files(); - - /** - * \brief Enqueue data. - * - * Enqueue data or part thereof. If a large data block is being written, then it may be - * enqueued in parts by setting this_data_len to the size of the data being written in this - * call. The total data size must be known in advance, however, as this is written into the - * record header on the first record write. The state of the write (i.e. how much has been - * written so far) is maintained in the data token dtokp. Partial writes will return in state - * ENQ_PART. - * - * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write - * operation did not complete successfully or was partially completed. The action taken under - * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT - * implies that all pages in the write page cache are waiting for AIO operations to return, - * and that the call should be remade after waiting a bit. - * - * Example: If a write of 99 kB is divided into three equal parts, then the following states - * and returns would characterize a successful operation: - * <pre> - * dtok. dtok. dtok. - * Pperation Return wstate() dsize() written() Comment - * -----------------+--------+--------+-------+---------+------------------------------------ - * NONE 0 0 Value of dtok before op - * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1 - * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed - * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again - * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3 - * </pre> - * - * \param data_buff Pointer to data to be enqueued for this enqueue operation. - * \param tot_data_len Total data length. - * \param this_data_len Amount to be written in this enqueue operation. - * \param dtokp Pointer to data token which contains the details of the enqueue operation. - * \param transient Flag indicating transient persistence (ie, ignored on recover). - * - * \exception TODO - */ - iores enqueue_data_record(const void* const data_buff, + void recover(EmptyFilePoolManager* efpm, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp, + const std::vector<std::string>* prep_txn_list_ptr, + uint64_t& highest_rid); + + /** + * \brief Notification to the journal that recovery is complete and that normal operation + * may resume. + * + * This call notifies the journal that recovery is complete and that normal operation + * may resume. The read pointers are reset so that all records read as a part of recover + * may be re-read during normal operation. The read-only flag is then reset, allowing + * enqueue and dequeue operations to resume. + * + * \exception TODO + */ + void recover_complete(); + + /** + * \brief Stops journal and deletes all journal files. + * + * Clear the journal directory of all journal files matching the base filename. + * + * \exception TODO + */ + void delete_jrnl_files(); + + /** + * \brief Enqueue data. + * + * Enqueue data or part thereof. If a large data block is being written, then it may be + * enqueued in parts by setting this_data_len to the size of the data being written in this + * call. The total data size must be known in advance, however, as this is written into the + * record header on the first record write. The state of the write (i.e. how much has been + * written so far) is maintained in the data token dtokp. Partial writes will return in state + * ENQ_PART. + * + * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write + * operation did not complete successfully or was partially completed. The action taken under + * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT + * implies that all pages in the write page cache are waiting for AIO operations to return, + * and that the call should be remade after waiting a bit. + * + * Example: If a write of 99 kB is divided into three equal parts, then the following states + * and returns would characterize a successful operation: + * <pre> + * dtok. dtok. dtok. + * Pperation Return wstate() dsize() written() Comment + * -----------------+--------+--------+-------+---------+------------------------------------ + * NONE 0 0 Value of dtok before op + * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1 + * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed + * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again + * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3 + * </pre> + * + * \param data_buff Pointer to data to be enqueued for this enqueue operation. + * \param tot_data_len Total data length. + * \param this_data_len Amount to be written in this enqueue operation. + * \param dtokp Pointer to data token which contains the details of the enqueue operation. + * \param transient Flag indicating transient persistence (ie, ignored on recover). + * + * \exception TODO + */ + iores enqueue_data_record(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const bool transient = false); + + iores enqueue_extern_data_record(const std::size_t tot_data_len, + data_tok* dtokp, + const bool transient = false); + + /** + * \brief Enqueue data. + * + * \param data_buff Pointer to data to be enqueued for this enqueue operation. + * \param tot_data_len Total data length. + * \param this_data_len Amount to be written in this enqueue operation. + * \param dtokp Pointer to data token which contains the details of the enqueue operation. + * \param xid String containing xid. An empty string (i.e. length=0) will be considered + * non-transactional. + * \param transient Flag indicating transient persistence (ie, ignored on recover). + * + * \exception TODO + */ + iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len, const std::size_t this_data_len, data_tok* dtokp, + const std::string& xid, const bool transient = false); - iores enqueue_extern_data_record(const std::size_t tot_data_len, + iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp, + const std::string& xid, const bool transient = false); - /** - * \brief Enqueue data. - * - * \param data_buff Pointer to data to be enqueued for this enqueue operation. - * \param tot_data_len Total data length. - * \param this_data_len Amount to be written in this enqueue operation. - * \param dtokp Pointer to data token which contains the details of the enqueue operation. - * \param xid String containing xid. An empty string (i.e. length=0) will be considered - * non-transactional. - * \param transient Flag indicating transient persistence (ie, ignored on recover). - * - * \exception TODO - */ - iores enqueue_txn_data_record(const void* const data_buff, - const std::size_t tot_data_len, - const std::size_t this_data_len, - data_tok* dtokp, - const std::string& xid, - const bool transient = false); - - iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, - data_tok* dtokp, - const std::string& xid, - const bool transient = false); - - /** - * \brief Reads data from the journal. It is the responsibility of the reader to free - * the memory that is allocated through this call - see below for details. - * - * Reads the next non-dequeued data record from the journal. - * - * <b>Note</b> that this call allocates memory into which the data and XID are copied. It - * is the responsibility of the caller to free this memory. The memory for the data and - * XID are allocated in a single call, and the XID precedes the data in the memory space. - * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory. - * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL, - * and it is the data pointer datapp that must be freed. Should neither an XID nor data be - * present (ie an empty record), then no memory is allocated, and both pointers will be NULL. - * In this case, there is no need to free memory. - * - * TODO: Fix this lousy interface. The caller should NOT be required to clean up these - * pointers! Rather use a struct, or better still, let the data token carry the data and - * xid pointers and lengths, and have the data token both allocate and delete. - * - * \param datapp Pointer to pointer that will be set to point to memory allocated and - * containing the data. Will be set to NULL if the call fails or there is no data - * in the record. - * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call - * fails or if there is no data in the record. - * \param xidpp Pointer to pointer that will be set to point to memory allocated and - * containing the XID. Will be set to NULL if the call fails or there is no XID attached - * to this record. - * \param xidsize Ref that will be set to the size of the XID. - * \param transient Ref that will be set true if record is transient. - * \param external Ref that will be set true if record is external. In this case, the data - * pointer datapp will be set to NULL, but dsize will contain the size of the data. - * NOTE: If there is an xid, then xidpp must be freed. - * \param dtokp Pointer to data_tok instance for this data, used to track state of data - * through journal. - * \param ignore_pending_txns When false (default), if the next record to be read is locked - * by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set - * to true, then locks are ignored. This is required for reading of the Transaction - * Prepared List (TPL) which may have its entries locked, but may be read from - * time-to-time, and needs all its records (locked and unlocked) to be available. - * - * \exception TODO - */ - iores read_data_record(void** const datapp, - std::size_t& dsize, - void** const xidpp, - std::size_t& xidsize, - bool& transient, - bool& external, - data_tok* const dtokp, - bool ignore_pending_txns = false); - - /** - * \brief Dequeues (marks as no longer needed) data record in journal. - * - * Dequeues (marks as no longer needed) data record in journal. Note that it is possible - * to use the same data token instance used to enqueue this data; it contains the record ID - * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the - * record to be dequeued and the write state of ENQ must be manually set in a new or reset - * instance of data_tok. - * - * \param dtokp Pointer to data_tok instance for this data, used to track state of data - * through journal. - * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing - * prepared XID list items, sets whether the complete() was called in commit or abort - * mode. - * - * \exception TODO - */ - iores dequeue_data_record(data_tok* const dtokp, + /** + * \brief Reads data from the journal. It is the responsibility of the reader to free + * the memory that is allocated through this call - see below for details. + * + * Reads the next non-dequeued data record from the journal. + * + * <b>Note</b> that this call allocates memory into which the data and XID are copied. It + * is the responsibility of the caller to free this memory. The memory for the data and + * XID are allocated in a single call, and the XID precedes the data in the memory space. + * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory. + * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL, + * and it is the data pointer datapp that must be freed. Should neither an XID nor data be + * present (ie an empty record), then no memory is allocated, and both pointers will be NULL. + * In this case, there is no need to free memory. + * + * TODO: Fix this lousy interface. The caller should NOT be required to clean up these + * pointers! Rather use a struct, or better still, let the data token carry the data and + * xid pointers and lengths, and have the data token both allocate and delete. + * + * \param datapp Pointer to pointer that will be set to point to memory allocated and + * containing the data. Will be set to NULL if the call fails or there is no data + * in the record. + * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call + * fails or if there is no data in the record. + * \param xidpp Pointer to pointer that will be set to point to memory allocated and + * containing the XID. Will be set to NULL if the call fails or there is no XID attached + * to this record. + * \param xidsize Ref that will be set to the size of the XID. + * \param transient Ref that will be set true if record is transient. + * \param external Ref that will be set true if record is external. In this case, the data + * pointer datapp will be set to NULL, but dsize will contain the size of the data. + * NOTE: If there is an xid, then xidpp must be freed. + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param ignore_pending_txns When false (default), if the next record to be read is locked + * by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set + * to true, then locks are ignored. This is required for reading of the Transaction + * Prepared List (TPL) which may have its entries locked, but may be read from + * time-to-time, and needs all its records (locked and unlocked) to be available. + * + * \exception TODO + */ + iores read_data_record(void** const datapp, + std::size_t& dsize, + void** const xidpp, + std::size_t& xidsize, + bool& transient, + bool& external, + data_tok* const dtokp, + bool ignore_pending_txns = false); + + /** + * \brief Dequeues (marks as no longer needed) data record in journal. + * + * Dequeues (marks as no longer needed) data record in journal. Note that it is possible + * to use the same data token instance used to enqueue this data; it contains the record ID + * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the + * record to be dequeued and the write state of ENQ must be manually set in a new or reset + * instance of data_tok. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing + * prepared XID list items, sets whether the complete() was called in commit or abort + * mode. + * + * \exception TODO + */ + iores dequeue_data_record(data_tok* const dtokp, + const bool txn_coml_commit = false); + + /** + * \brief Dequeues (marks as no longer needed) data record in journal. + * + * Dequeues (marks as no longer needed) data record in journal as part of a transaction. + * Note that it is possible to use the same data token instance used to enqueue this data; + * it contains the RID needed to correctly mark this data as dequeued in the journal. + * Otherwise the RID of the record to be dequeued and the write state of ENQ must be + * manually set in a new or reset instance of data_tok. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. An empty string (i.e. length=0) will be considered + * non-transactional. + * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing + * prepared XID list items, sets whether the complete() was called in commit or abort + * mode. + * + * \exception TODO + */ + iores dequeue_txn_data_record(data_tok* const dtokp, + const std::string& xid, const bool txn_coml_commit = false); - /** - * \brief Dequeues (marks as no longer needed) data record in journal. - * - * Dequeues (marks as no longer needed) data record in journal as part of a transaction. - * Note that it is possible to use the same data token instance used to enqueue this data; - * it contains the RID needed to correctly mark this data as dequeued in the journal. - * Otherwise the RID of the record to be dequeued and the write state of ENQ must be - * manually set in a new or reset instance of data_tok. - * - * \param dtokp Pointer to data_tok instance for this data, used to track state of data - * through journal. - * \param xid String containing xid. An empty string (i.e. length=0) will be considered - * non-transactional. - * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing - * prepared XID list items, sets whether the complete() was called in commit or abort - * mode. - * - * \exception TODO - */ - iores dequeue_txn_data_record(data_tok* const dtokp, - const std::string& xid, - const bool txn_coml_commit = false); - - /** - * \brief Abort the transaction for all records enqueued or dequeued with the matching xid. - * - * Abort the transaction for all records enqueued with the matching xid. All enqueued records - * are effectively deleted from the journal, and can not be read. All dequeued records remain - * as though they had never been dequeued. - * - * \param dtokp Pointer to data_tok instance for this data, used to track state of data - * through journal. - * \param xid String containing xid. - * - * \exception TODO - */ - iores txn_abort(data_tok* const dtokp, - const std::string& xid); - - /** - * \brief Commit the transaction for all records enqueued or dequeued with the matching xid. - * - * Commit the transaction for all records enqueued with the matching xid. All enqueued - * records are effectively released for reading and dequeueing. All dequeued records are - * removed and can no longer be accessed. - * - * \param dtokp Pointer to data_tok instance for this data, used to track state of data - * through journal. - * \param xid String containing xid. - * - * \exception TODO - */ - iores txn_commit(data_tok* const dtokp, - const std::string& xid); - - /** - * \brief Check whether all the enqueue records for the given xid have reached disk. - * - * \param xid String containing xid. - * - * \exception TODO - */ - bool is_txn_synced(const std::string& xid); - - /** - * \brief Forces a check for returned AIO write events. - * - * Forces a check for returned AIO write events. This is normally performed by enqueue() and - * dequeue() operations, but if these operations cease, then this call needs to be made to - * force the processing of any outstanding AIO operations. - */ - int32_t get_wr_events(timespec* const timeout); - - /** - * \brief Stop the journal from accepting any further requests to read or write data. - * - * This operation is used to stop the journal. This is the normal mechanism for bringing the - * journal to an orderly stop. Any outstanding AIO operations or partially written pages in - * the write page cache will by flushed and will complete. - * - * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring - * it. - * - * \param block_till_aio_cmpl If true, will block the thread while waiting for all - * outstanding AIO operations to complete. - */ - void stop(const bool block_till_aio_cmpl = false); - - /** - * \brief Force a flush of the write page cache, creating a single AIO write operation. - */ - iores flush(const bool block_till_aio_cmpl = false); - - inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe? - - inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); } - - uint32_t get_wr_outstanding_aio_dblks() const; - - uint32_t get_rd_outstanding_aio_dblks() const; - - LinearFileController& getLinearFileControllerRef(); - - /** - * \brief Check if a particular rid is enqueued. Note that this function will return - * false if the rid is transactionally enqueued and is not committed, or if it is - * locked (i.e. transactionally dequeued, but the dequeue has not been committed). - */ - inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); } - - inline bool is_locked(const uint64_t rid) { - if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) - return false; - return _emap.is_locked(rid) == enq_map::EMAP_TRUE; - } - - inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); } - - inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); } - - inline uint32_t get_open_txn_cnt() const { return _tmap.size(); } - - // TODO Make this a const, but txn_map must support const first. - inline txn_map& get_txn_map() { return _tmap; } - - /** - * \brief Check if the journal is stopped. - * - * \return <b><i>true</i></b> if the jouranl is stopped; - * <b><i>false</i></b> otherwise. - */ - inline bool is_stopped() { return _stop_flag; } - - /** - * \brief Check if the journal is ready to read and write data. - * - * Checks if the journal is ready to read and write data. This function will return - * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop() - * function has not been called since the initialization. - * - * Note that the journal may also be stopped if an internal error occurs (such as running out - * of data journal file space). - * - * \return <b><i>true</i></b> if the journal is ready to read and write data; - * <b><i>false</i></b> otherwise. - */ - inline bool is_ready() const { return _init_flag && !_stop_flag; } - - inline bool is_read_only() const { return _readonly_flag; } - - /** - * \brief Get the journal directory. - * - * This returns the journal directory as set during initialization. This is the directory - * into which the journal files will be written. - */ - inline const std::string& dirname() const { return _jdir.dirname(); } - - // Management instrumentation callbacks - inline virtual void instr_incr_outstanding_aio_cnt() {} - inline virtual void instr_decr_outstanding_aio_cnt() {} - - protected: - static bool _init; - static bool init_statics(); - - /** - * \brief Check status of journal before allowing write operations. - */ - void check_wstatus(const char* fn_name) const; - - /** - * \brief Check status of journal before allowing read operations. - */ - void check_rstatus(const char* fn_name) const; - - /** - * \brief Call that blocks while waiting for all outstanding AIOs to complete - */ - void aio_cmpl_wait(); - - /** - * \brief Call that blocks until at least one message returns; used to wait for - * AIO wait conditions to clear. - */ - bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp); - }; + /** + * \brief Abort the transaction for all records enqueued or dequeued with the matching xid. + * + * Abort the transaction for all records enqueued with the matching xid. All enqueued records + * are effectively deleted from the journal, and can not be read. All dequeued records remain + * as though they had never been dequeued. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. + * + * \exception TODO + */ + iores txn_abort(data_tok* const dtokp, + const std::string& xid); + + /** + * \brief Commit the transaction for all records enqueued or dequeued with the matching xid. + * + * Commit the transaction for all records enqueued with the matching xid. All enqueued + * records are effectively released for reading and dequeueing. All dequeued records are + * removed and can no longer be accessed. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. + * + * \exception TODO + */ + iores txn_commit(data_tok* const dtokp, + const std::string& xid); + + /** + * \brief Check whether all the enqueue records for the given xid have reached disk. + * + * \param xid String containing xid. + * + * \exception TODO + */ + bool is_txn_synced(const std::string& xid); + + /** + * \brief Forces a check for returned AIO write events. + * + * Forces a check for returned AIO write events. This is normally performed by enqueue() and + * dequeue() operations, but if these operations cease, then this call needs to be made to + * force the processing of any outstanding AIO operations. + */ + int32_t get_wr_events(timespec* const timeout); + + /** + * \brief Stop the journal from accepting any further requests to read or write data. + * + * This operation is used to stop the journal. This is the normal mechanism for bringing the + * journal to an orderly stop. Any outstanding AIO operations or partially written pages in + * the write page cache will by flushed and will complete. + * + * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring + * it. + * + * \param block_till_aio_cmpl If true, will block the thread while waiting for all + * outstanding AIO operations to complete. + */ + void stop(const bool block_till_aio_cmpl = false); + + /** + * \brief Force a flush of the write page cache, creating a single AIO write operation. + */ + iores flush(const bool block_till_aio_cmpl = false); + + inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe? + + inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); } + + uint32_t get_wr_outstanding_aio_dblks() const; + + uint32_t get_rd_outstanding_aio_dblks() const; + + LinearFileController& getLinearFileControllerRef(); + + /** + * \brief Check if a particular rid is enqueued. Note that this function will return + * false if the rid is transactionally enqueued and is not committed, or if it is + * locked (i.e. transactionally dequeued, but the dequeue has not been committed). + */ + inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); } + + inline bool is_locked(const uint64_t rid) { + if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) + return false; + return _emap.is_locked(rid) == enq_map::EMAP_TRUE; + } + + inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); } + + inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); } + + inline uint32_t get_open_txn_cnt() const { return _tmap.size(); } + + // TODO Make this a const, but txn_map must support const first. + inline txn_map& get_txn_map() { return _tmap; } + + /** + * \brief Check if the journal is stopped. + * + * \return <b><i>true</i></b> if the jouranl is stopped; + * <b><i>false</i></b> otherwise. + */ + inline bool is_stopped() { return _stop_flag; } + + /** + * \brief Check if the journal is ready to read and write data. + * + * Checks if the journal is ready to read and write data. This function will return + * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop() + * function has not been called since the initialization. + * + * Note that the journal may also be stopped if an internal error occurs (such as running out + * of data journal file space). + * + * \return <b><i>true</i></b> if the journal is ready to read and write data; + * <b><i>false</i></b> otherwise. + */ + inline bool is_ready() const { return _init_flag && !_stop_flag; } + + inline bool is_read_only() const { return _readonly_flag; } + + /** + * \brief Get the journal directory. + * + * This returns the journal directory as set during initialization. This is the directory + * into which the journal files will be written. + */ + inline const std::string& dirname() const { return _jdir.dirname(); } + + // Management instrumentation callbacks + inline virtual void instr_incr_outstanding_aio_cnt() {} + inline virtual void instr_decr_outstanding_aio_cnt() {} + +protected: + static bool _init; + static bool init_statics(); + + /** + * \brief Check status of journal before allowing write operations. + */ + void check_wstatus(const char* fn_name) const; + + /** + * \brief Check status of journal before allowing read operations. + */ + void check_rstatus(const char* fn_name) const; + + /** + * \brief Call that blocks while waiting for all outstanding AIOs to complete + */ + void aio_cmpl_wait(); + + /** + * \brief Call that blocks until at least one message returns; used to wait for + * AIO wait conditions to clear. + */ + bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp); +}; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp index 6f28a48a7f..896f44ceff 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp @@ -21,14 +21,10 @@ #include "qpid/linearstore/journal/jdir.h" -#include <cstdlib> #include <cstring> #include <cerrno> #include <iomanip> -#include "qpid/linearstore/journal/jcfg.h" -#include "qpid/linearstore/journal/jerrno.h" #include "qpid/linearstore/journal/jexception.h" -#include <sstream> #include <sys/stat.h> #include <unistd.h> diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.h b/qpid/cpp/src/qpid/linearstore/journal/jdir.h index c13a5f5af0..86b16f8545 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jdir.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.h @@ -22,13 +22,6 @@ #ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H #define QPID_LINEARSTORE_JOURNAL_JDIR_H -namespace qpid { -namespace linearstore { -namespace journal { -class jdir; -}}} - -//#include "qpid/linearstore/jrnl/jinf.h" #include <dirent.h> #include <string> #include <vector> diff --git a/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp b/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp index eb6e80d2b3..49f486746a 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jexception.cpp @@ -22,8 +22,6 @@ #include "qpid/linearstore/journal/jexception.h" #include <iomanip> -#include <sstream> -#include "qpid/linearstore/journal/jerrno.h" #define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1 diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h index 8a728bba29..7cb6df13a4 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h @@ -22,14 +22,11 @@ #ifndef QPID_LINEARSTORE_JOURNAL_JREC_H #define QPID_LINEARSTORE_JOURNAL_JREC_H -#include <cstddef> #include <fstream> #include "qpid/linearstore/journal/jcfg.h" #include <stdint.h> -#include <string> struct rec_hdr_t; -struct rec_tail_t; namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp index 11553aaf47..54755dd4ed 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/pmgr.cpp @@ -21,14 +21,6 @@ #include "qpid/linearstore/journal/pmgr.h" -#include <cerrno> -#include <cstdlib> -#include <cstring> -#include "qpid/linearstore/journal/jcfg.h" -#include "qpid/linearstore/journal/jcntl.h" -#include "qpid/linearstore/journal/jerrno.h" -#include <sstream> - namespace qpid { namespace linearstore { namespace journal { diff --git a/qpid/cpp/src/qpid/linearstore/journal/pmgr.h b/qpid/cpp/src/qpid/linearstore/journal/pmgr.h index 5de253c3d3..157a6f0566 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/pmgr.h +++ b/qpid/cpp/src/qpid/linearstore/journal/pmgr.h @@ -22,17 +22,8 @@ #ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H #define QPID_LINEARSTORE_JOURNAL_PMGR_H -namespace qpid { -namespace linearstore { -namespace journal { - class pmgr; - class jcntl; -}}} - #include <deque> #include "qpid/linearstore/journal/aio.h" -#include "qpid/linearstore/journal/aio_callback.h" -#include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/deq_rec.h" #include "qpid/linearstore/journal/enq_map.h" #include "qpid/linearstore/journal/enq_rec.h" @@ -43,83 +34,86 @@ namespace qpid { namespace linearstore { namespace journal { - class JournalFile; +class aio_callback; +class data_tok; +class jcntl; +class JournalFile; +/** +* \brief Abstract class for managing either read or write page cache of arbitrary size and +* number of cache_num_pages. +*/ +class pmgr +{ +public: /** - * \brief Abstract class for managing either read or write page cache of arbitrary size and - * number of cache_num_pages. + * \brief Enumeration of possible stats of a page within a page cache. */ - class pmgr + enum page_state { - public: - /** - * \brief Enumeration of possible stats of a page within a page cache. - */ - enum page_state - { - UNUSED, ///< A page is uninitialized, contains no data. - IN_USE, ///< Page is in use. - AIO_PENDING ///< An AIO request outstanding. - }; + UNUSED, ///< A page is uninitialized, contains no data. + IN_USE, ///< Page is in use. + AIO_PENDING ///< An AIO request outstanding. + }; - /** - * \brief Page control block, carries control and state information for each page in the - * cache. - */ - struct page_cb - { - uint16_t _index; ///< Index of this page - page_state _state; ///< Status of page - uint64_t _frid; ///< First rid in page (used for fhdr init) - uint32_t _wdblks; ///< Total number of dblks in page so far - uint32_t _rdblks; ///< Total number of dblks in page - std::deque<data_tok*>* _pdtokl; ///< Page message tokens list - JournalFile* _jfp; ///< Journal file for incrementing compl counts - void* _pbuff; ///< Page buffer + /** + * \brief Page control block, carries control and state information for each page in the + * cache. + */ + struct page_cb + { + uint16_t _index; ///< Index of this page + page_state _state; ///< Status of page + uint64_t _frid; ///< First rid in page (used for fhdr init) + uint32_t _wdblks; ///< Total number of dblks in page so far + uint32_t _rdblks; ///< Total number of dblks in page + std::deque<data_tok*>* _pdtokl; ///< Page message tokens list + JournalFile* _jfp; ///< Journal file for incrementing compl counts + void* _pbuff; ///< Page buffer - page_cb(uint16_t index); ///< Convenience constructor - const char* state_str() const; ///< Return state as string for this pcb - }; + page_cb(uint16_t index); ///< Convenience constructor + const char* state_str() const; ///< Return state as string for this pcb + }; - protected: - static const uint32_t _sblkSizeBytes; ///< Disk softblock size - uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages - uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages - jcntl* _jc; ///< Pointer to journal controller - enq_map& _emap; ///< Ref to enqueue map - txn_map& _tmap; ///< Ref to transaction map - void* _page_base_ptr; ///< Base pointer to page memory - void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory - page_cb* _page_cb_arr; ///< Array of page_cb structs - aio_cb* _aio_cb_arr; ///< Array of iocb structs - aio_event* _aio_event_arr; ///< Array of io_events - io_context_t _ioctx; ///< AIO context for read/write operations - uint16_t _pg_index; ///< Index of current page being used - uint32_t _pg_cntr; ///< Page counter; determines if file rotation req'd - uint32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks - uint32_t _aio_evt_rem; ///< Remaining AIO events - aio_callback* _cbp; ///< Pointer to callback object +protected: + static const uint32_t _sblkSizeBytes; ///< Disk softblock size + uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages + uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages + jcntl* _jc; ///< Pointer to journal controller + enq_map& _emap; ///< Ref to enqueue map + txn_map& _tmap; ///< Ref to transaction map + void* _page_base_ptr; ///< Base pointer to page memory + void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory + page_cb* _page_cb_arr; ///< Array of page_cb structs + aio_cb* _aio_cb_arr; ///< Array of iocb structs + aio_event* _aio_event_arr; ///< Array of io_events + io_context_t _ioctx; ///< AIO context for read/write operations + uint16_t _pg_index; ///< Index of current page being used + uint32_t _pg_cntr; ///< Page counter; determines if file rotation req'd + uint32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks + uint32_t _aio_evt_rem; ///< Remaining AIO events + aio_callback* _cbp; ///< Pointer to callback object - enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding - deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding - txn_rec _txn_rec; ///< Transaction record used for encoding/decoding + enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding + deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding + txn_rec _txn_rec; ///< Transaction record used for encoding/decoding - public: - pmgr(jcntl* jc, enq_map& emap, txn_map& tmap); - virtual ~pmgr(); +public: + pmgr(jcntl* jc, enq_map& emap, txn_map& tmap); + virtual ~pmgr(); - virtual int32_t get_events(timespec* const timeout, bool flush) = 0; - inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; } - static const char* page_state_str(page_state ps); - inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; } - inline uint16_t cache_num_pages() const { return _cache_num_pages; } + virtual int32_t get_events(timespec* const timeout, bool flush) = 0; + inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; } + static const char* page_state_str(page_state ps); + inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; } + inline uint16_t cache_num_pages() const { return _cache_num_pages; } - protected: - virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, - const uint16_t cache_num_pages); - virtual void rotate_page() = 0; - virtual void clean(); - }; +protected: + virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, + const uint16_t cache_num_pages); + virtual void rotate_page() = 0; + virtual void clean(); +}; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/slock.h b/qpid/cpp/src/qpid/linearstore/journal/slock.h index 17ed06dfce..12e9e2d08c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/slock.h +++ b/qpid/cpp/src/qpid/linearstore/journal/slock.h @@ -22,7 +22,6 @@ #ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H #define QPID_LINEARSTORE_JOURNAL_SLOCK_H -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/smutex.h" #include <pthread.h> @@ -30,42 +29,42 @@ namespace qpid { namespace linearstore { namespace journal { - // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope - class slock +// Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope +class slock +{ +protected: + const smutex& _sm; +public: + inline slock(const smutex& sm) : _sm(sm) { - protected: - const smutex& _sm; - public: - inline slock(const smutex& sm) : _sm(sm) - { - PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock"); - } - inline ~slock() - { - PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock"); - } - }; + PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock"); + } + inline ~slock() + { + PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock"); + } +}; - // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope - class stlock +// Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope +class stlock +{ +protected: + const smutex& _sm; + bool _locked; +public: + inline stlock(const smutex& sm) : _sm(sm), _locked(false) + { + int ret = ::pthread_mutex_trylock(_sm.get()); + _locked = (ret == 0); // check if lock obtained + if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock"); + } + inline ~stlock() { - protected: - const smutex& _sm; - bool _locked; - public: - inline stlock(const smutex& sm) : _sm(sm), _locked(false) - { - int ret = ::pthread_mutex_trylock(_sm.get()); - _locked = (ret == 0); // check if lock obtained - if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock"); - } - inline ~stlock() - { - if (_locked) - PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock"); - } - inline bool locked() const { return _locked; } - }; + if (_locked) + PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock"); + } + inline bool locked() const { return _locked; } +}; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp index 6d84c6c451..5e0a28814d 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp @@ -21,11 +21,7 @@ #include "qpid/linearstore/journal/txn_map.h" -#include <iomanip> -#include "qpid/linearstore/journal/jerrno.h" -#include "qpid/linearstore/journal/jexception.h" #include "qpid/linearstore/journal/slock.h" -#include <sstream> namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h index ef01e1df92..0f8cd5f3d7 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h @@ -22,16 +22,8 @@ #ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H #define QPID_LINEARSTORE_JOURNAL_TXN_MAP_H -namespace qpid { -namespace linearstore { -namespace journal { - class txn_map; -}}} - #include "qpid/linearstore/journal/smutex.h" #include <map> -#include <pthread.h> -#include <string> #include <vector> namespace qpid { diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index ed4853ba1b..37448f2a8d 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -23,7 +23,6 @@ #include <cassert> #include <cstring> -#include <iomanip> #include "qpid/linearstore/journal/jexception.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index 9af2764ab5..e308f4ab06 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -22,16 +22,12 @@ #include "qpid/linearstore/journal/wmgr.h" #include <cassert> -#include <cerrno> -#include <cstdlib> -#include <cstring> -#include "qpid/linearstore/journal/utils/file_hdr.h" -#include "qpid/linearstore/journal/jcfg.h" +#include "qpid/linearstore/journal/aio_callback.h" +#include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/jcntl.h" -#include "qpid/linearstore/journal/jerrno.h" #include "qpid/linearstore/journal/JournalFile.h" -#include <sstream> -#include <stdint.h> +#include "qpid/linearstore/journal/LinearFileController.h" +#include "qpid/linearstore/journal/utils/file_hdr.h" //#include <iostream> // DEBUG diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h index c55c0db577..0aa21ca545 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h @@ -22,20 +22,10 @@ #ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H #define QPID_LINEARSTORE_JOURNAL_WMGR_H -namespace qpid { -namespace linearstore { -namespace journal { -class wmgr; -}}} - -#include <cstring> -#include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/journal/enums.h" #include "qpid/linearstore/journal/pmgr.h" #include <set> -class file_hdr_t; - namespace qpid { namespace linearstore { namespace journal { |
