From fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 20 Sep 2013 16:43:10 +0000 Subject: QPID-4984: WIP - compiles, but not functional. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525050 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/linearstore.cmake | 13 +- qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp | 2 +- .../qpid/linearstore/EmptyFilePoolManagerImpl.cpp | 59 ++ .../qpid/linearstore/EmptyFilePoolManagerImpl.h | 40 ++ qpid/cpp/src/qpid/linearstore/JournalImpl.cpp | 72 ++- qpid/cpp/src/qpid/linearstore/JournalImpl.h | 47 +- qpid/cpp/src/qpid/linearstore/Log.h | 30 - qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 690 ++++++++++++--------- qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h | 41 +- qpid/cpp/src/qpid/linearstore/QpidLog.h | 30 + qpid/cpp/src/qpid/linearstore/StorePlugin.cpp | 2 +- .../src/qpid/linearstore/jrnl/EmptyFilePool.cpp | 255 ++++++++ qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h | 85 +++ .../qpid/linearstore/jrnl/EmptyFilePoolManager.cpp | 175 ++++++ .../qpid/linearstore/jrnl/EmptyFilePoolManager.h | 63 ++ .../linearstore/jrnl/EmptyFilePoolPartition.cpp | 134 ++++ .../qpid/linearstore/jrnl/EmptyFilePoolPartition.h | 72 +++ .../src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h | 37 ++ qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp | 54 ++ qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h | 46 ++ .../linearstore/jrnl/JournalFileController.cpp | 142 +++++ .../qpid/linearstore/jrnl/JournalFileController.h | 68 ++ qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp | 62 ++ qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h | 56 ++ qpid/cpp/src/qpid/linearstore/jrnl/enums.h | 12 +- qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h | 5 +- qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp | 53 +- qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h | 24 +- qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp | 76 ++- qpid/cpp/src/qpid/linearstore/jrnl/jdir.h | 29 +- qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp | 102 +-- qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h | 7 + qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp | 4 +- .../cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c | 57 +- .../cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h | 46 +- qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp | 38 +- qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h | 6 +- .../cpp/src/qpid/linearstore/management-schema.xml | 26 +- 38 files changed, 2179 insertions(+), 581 deletions(-) create mode 100644 qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h delete mode 100644 qpid/cpp/src/qpid/linearstore/Log.h create mode 100644 qpid/cpp/src/qpid/linearstore/QpidLog.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp create mode 100644 qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index ddf70c75b9..5377acd207 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -80,12 +80,18 @@ if (BUILD_LINEARSTORE) qpid/linearstore/jrnl/deq_rec.cpp qpid/linearstore/jrnl/enq_map.cpp qpid/linearstore/jrnl/enq_rec.cpp + qpid/linearstore/jrnl/EmptyFilePool.cpp + qpid/linearstore/jrnl/EmptyFilePoolManager.cpp + qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp #qpid/linearstore/jrnl/fcntl.cpp qpid/linearstore/jrnl/jcntl.cpp qpid/linearstore/jrnl/jdir.cpp qpid/linearstore/jrnl/jerrno.cpp qpid/linearstore/jrnl/jexception.cpp #qpid/linearstore/jrnl/jinf.cpp + qpid/linearstore/jrnl/JournalFile.cpp + qpid/linearstore/jrnl/JournalFileController.cpp + qpid/linearstore/jrnl/JournalLog.cpp qpid/linearstore/jrnl/jrec.cpp #qpid/linearstore/jrnl/lp_map.cpp #qpid/linearstore/jrnl/lpmgr.cpp @@ -106,6 +112,7 @@ if (BUILD_LINEARSTORE) qpid/linearstore/BindingDbt.cpp qpid/linearstore/BufferValue.cpp qpid/linearstore/DataTokenImpl.cpp + qpid/linearstore/EmptyFilePoolManagerImpl.cpp qpid/linearstore/IdDbt.cpp qpid/linearstore/IdSequence.cpp qpid/linearstore/JournalImpl.cpp @@ -118,9 +125,9 @@ if (BUILD_LINEARSTORE) qpid/linearstore/jrnl/utils/deq_hdr.c qpid/linearstore/jrnl/utils/enq_hdr.c qpid/linearstore/jrnl/utils/file_hdr.c - qpid/linearstore/jrnl/utils/rec_hdr.c - qpid/linearstore/jrnl/utils/rec_tail.c - qpid/linearstore/jrnl/utils/txn_hdr.c + qpid/linearstore/jrnl/utils/rec_hdr.c + qpid/linearstore/jrnl/utils/rec_tail.c + qpid/linearstore/jrnl/utils/txn_hdr.c ) # linearstore include directories diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp index 3a11817d1e..204affd1d1 100644 --- a/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp +++ b/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp @@ -72,7 +72,7 @@ rmgr::initialize(aio_callback* const cbp) throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize"); } _fhdr_aio_cb_ptr = new aio_cb; - std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*)); + std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb)); } void diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp new file mode 100644 index 0000000000..37e3922846 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePoolManagerImpl.h" + +#include "QpidLog.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" + +namespace qpid { +namespace linearstore { + +EmptyFilePoolManagerImpl::EmptyFilePoolManagerImpl(const std::string& qlsStorePath) : + qpid::qls_jrnl::EmptyFilePoolManager(qlsStorePath) +{} + +EmptyFilePoolManagerImpl::~EmptyFilePoolManagerImpl() {} + +void EmptyFilePoolManagerImpl::findEfpPartitions() { + qpid::qls_jrnl::EmptyFilePoolManager::findEfpPartitions(); + QLS_LOG(info, "EFP Manager initialization complete"); + std::vector partitionList; + std::vector filePoolList; + getEfpPartitions(partitionList); + if (partitionList.size() == 0) { + QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.") + } else { + QLS_LOG(info, "> EFP Partitions found: " << partitionList.size()); + for (std::vector::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) { + filePoolList.clear(); + (*i)->getEmptyFilePools(filePoolList); + QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" << + (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'"); + for (std::vector::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { + QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() << + " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB"); + } + } + } +} + +}} /* namespace qpid::linearstore */ diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h new file mode 100644 index 0000000000..f9087b2fbc --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ +#define QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ + +#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h" + +namespace qpid { +namespace linearstore { + +class EmptyFilePoolManagerImpl: public qpid::qls_jrnl::EmptyFilePoolManager +{ +public: + EmptyFilePoolManagerImpl(const std::string& qlsStorePath); + virtual ~EmptyFilePoolManagerImpl(); + void findEfpPartitions(); +}; + +}} /* namespace qpid::linearstore */ + +#endif /* QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index f8b7777269..3fdfb24592 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -23,16 +23,17 @@ #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/EmptyFilePool.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" -#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h" +//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h" #include "qmf/org/apache/qpid/linearstore/EventCreated.h" #include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h" #include "qmf/org/apache/qpid/linearstore/EventFull.h" #include "qmf/org/apache/qpid/linearstore/EventRecovered.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/linearstore/StoreException.h" using namespace qpid::qls_jrnl; @@ -53,22 +54,23 @@ void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); i JournalImpl::JournalImpl(qpid::sys::Timer& timer_, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* a, DeleteCallback onDelete): - jcntl(journalId, journalDirectory, journalBaseFilename), + jcntl(journalId, journalDirectory/*, journalBaseFilename*/), timer(timer_), getEventsTimerSetFlag(false), - lastReadRid(0), + efpp(0), +// lastReadRid(0), writeActivityFlag(false), flushTriggeredFlag(true), - _xidp(0), - _datap(0), - _dlen(0), - _dtok(), - _external(false), +// _xidp(0), +// _datap(0), +// _dlen(0), +// _dtok(), +// _external(false), deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); @@ -82,7 +84,7 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, QLS_LOG2(notice, _jid, "Created"); std::ostringstream oss; - oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\""; + oss << "Journal directory = \"" << journalDirectory << "\""; QLS_LOG2(debug, _jid, oss.str()); } @@ -95,7 +97,7 @@ JournalImpl::~JournalImpl() } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); - free_read_buffers(); +// free_read_buffers(); if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); @@ -116,14 +118,14 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); - _mgmtObject->set_baseFileName(_base_filename); +// _mgmtObject->set_baseFileName(_base_filename); _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); _mgmtObject->set_readPages(JRNL_RMGR_PAGES); // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime - _mgmtObject->set_initialFileCount(0); - _mgmtObject->set_dataFileSize(0); - _mgmtObject->set_currentFileCount(0); + //_mgmtObject->set_initialFileCount(0); + //_mgmtObject->set_dataFileSize(0); + //_mgmtObject->set_currentFileCount(0); _mgmtObject->set_writePageSize(0); _mgmtObject->set_writePages(0); @@ -133,22 +135,23 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ +JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp) { - std::ostringstream oss; -// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; - oss << "Initialize;"; - oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; - oss << " wcache_num_pages=" << wcache_num_pages; - QLS_LOG2(debug, _jid, oss.str()); - jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp); - QLS_LOG2(debug, _jid, "Initialization complete"); + efpp = efpp_; +// efpp->createJournal(_jdir); +// QLS_LOG2(notice, _jid, "Initialized"); +// std::ostringstream oss; +//// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; +// oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber(); +// oss << " efpFileSizeKb=" << efpp_->fileSizeKib(); +// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; +// oss << " wcache_num_pages=" << wcache_num_pages; +// QLS_LOG2(debug, _jid, oss.str()); + jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp); +// QLS_LOG2(debug, _jid, "Initialization complete"); // TODO: replace for linearstore: _lpmgr /* if (_mgmtObject.get() != 0) @@ -261,6 +264,7 @@ JournalImpl::recover_complete() //#define AIO_SLEEP_TIME_US 10 // 0.01 ms // Return true if content is recovered from store; false if content is external and must be recovered from an external store. // Throw exception for all errors. +/* bool JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset) { @@ -351,6 +355,7 @@ JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size } return true; } +*/ void JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -574,6 +579,7 @@ void JournalImpl::rd_aio_cb(std::vector& /*pil*/) {} +/* void JournalImpl::free_read_buffers() { @@ -586,6 +592,12 @@ JournalImpl::free_read_buffers() _datap = 0; } } +*/ + +void +JournalImpl::createStore() { + +} void JournalImpl::handleIoResult(const iores r) @@ -624,12 +636,13 @@ JournalImpl::handleIoResult(const iores r) } } -qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId, +qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/, qpid::management::Args& /*args*/, std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; +/* switch (methodId) { case _qmf::Journal::METHOD_EXPAND : @@ -640,6 +653,7 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t m status = Manageable::STATUS_NOT_IMPLEMENTED; break; } +*/ return status; } diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index 692bccc9b0..264162e5bb 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -19,11 +19,12 @@ * */ -#ifndef QPID_LEGACYSTORE_JOURNALIMPL_H -#define QPID_LEGACYSTORE_JOURNALIMPL_H +#ifndef QPID_LINEARSTORE_JOURNALIMPL_H +#define QPID_LINEARSTORE_JOURNALIMPL_H #include #include "qpid/linearstore/jrnl/enums.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" #include "qpid/linearstore/jrnl/jcntl.h" #include "qpid/linearstore/DataTokenImpl.h" #include "qpid/linearstore/PreparedTransaction.h" @@ -35,12 +36,15 @@ #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/linearstore/Journal.h" -namespace qpid { +namespace qpid{ + namespace sys { class Timer; -}} +} +namespace qls_jrnl { +class EmptyFilePool; +} -namespace qpid{ namespace linearstore{ class JournalImpl; @@ -83,20 +87,21 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr boost::intrusive_ptr getEventsFireEventsPtr; qpid::sys::Mutex _getf_lock; qpid::sys::Mutex _read_lock; + qpid::qls_jrnl::EmptyFilePool* efpp; - uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests - std::vector oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence +// uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests +// std::vector oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence bool writeActivityFlag; bool flushTriggeredFlag; boost::intrusive_ptr inactivityFireEventPtr; // temp local vars for loadMsgContent below - void* _xidp; - void* _datap; - size_t _dlen; - qpid::qls_jrnl::data_tok _dtok; - bool _external; +// void* _xidp; +// void* _datap; +// size_t _dlen; +// qpid::qls_jrnl::data_tok _dtok; +// bool _external; qpid::management::ManagementAgent* _agent; qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; @@ -107,7 +112,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr JournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent, @@ -121,6 +126,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ + qpid::qls_jrnl::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp); @@ -129,10 +135,10 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ + qpid::qls_jrnl::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks) { - initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, - this); + initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this); } void recover(/*const uint16_t num_jfiles, @@ -164,7 +170,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr // Temporary fn to read and save last msg read from journal so it can be assigned // in chunks. To be replaced when coding to do this direct from the journal is ready. // Returns true if the record is extern, false if local. - bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); +// bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); // Overrides for write inactivity timer void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -216,7 +222,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void resetDeleteCallback() { deleteCallback = DeleteCallback(); } private: - void free_read_buffers(); +// void free_read_buffers(); + void createStore(); inline void setGetEventTimer() { @@ -242,11 +249,11 @@ class TplJournalImpl : public JournalImpl TplJournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, - const std::string& journalBaseFilename, +// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent) : - JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) + JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent) {} virtual ~TplJournalImpl() {} @@ -263,4 +270,4 @@ class TplJournalImpl : public JournalImpl } // namespace msgstore } // namespace mrg -#endif // ifndef QPID_LEGACYSTORE_JOURNALIMPL_H +#endif // ifndef QPID_LINEARSTORE_JOURNALIMPL_H diff --git a/qpid/cpp/src/qpid/linearstore/Log.h b/qpid/cpp/src/qpid/linearstore/Log.h deleted file mode 100644 index b03ea7ac9d..0000000000 --- a/qpid/cpp/src/qpid/linearstore/Log.h +++ /dev/null @@ -1,30 +0,0 @@ - /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef QPID_LEGACYSTORE_LOG_H -#define QPID_LEGACYSTORE_LOG_H - -#include "qpid/log/Statement.h" - -#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg) -#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg) - -#endif // QPID_LEGACYSTORE_LOG_H diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index f4b9a40455..5372963bc2 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -21,17 +21,19 @@ #include "qpid/linearstore/MessageStoreImpl.h" +#include "db-inc.h" #include "qpid/broker/QueueSettings.h" #include "qpid/linearstore/BindingDbt.h" #include "qpid/linearstore/BufferValue.h" #include "qpid/linearstore/IdDbt.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h" #include "qpid/linearstore/jrnl/txn_map.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/framing/FieldValue.h" #include "qmf/org/apache/qpid/linearstore/Package.h" #include "qpid/linearstore/StoreException.h" #include -#include + #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec #define AIO_SLEEP_TIME_US 10 // 0.01 ms @@ -43,53 +45,46 @@ namespace linearstore{ const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name + // FIXME aconway 2010-03-09: was 10 qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; -MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t _rid, - const bool _deq_flag, - const bool _commit_flag, - const bool _tpc_flag) : - rid(_rid), - deq_flag(_deq_flag), - commit_flag(_commit_flag), - tpc_flag(_tpc_flag) +MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_, + const bool deq_flag_, + const bool commit_flag_, + const bool tpc_flag_) : + rid(rid_), + deq_flag(deq_flag_), + commit_flag(commit_flag_), + tpc_flag(tpc_flag_) {} -MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) : - numJrnlFiles(0), - autoJrnlExpand(false), - autoJrnlExpandMaxFiles(0), - jrnlFsizeSblks(0), +MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : + defaultEfpPartitionNumber(0), + defaultEfpFileSizeKib(0), truncateFlag(false), wCachePgSizeSblks(0), wCacheNumPages(0), - tplNumJrnlFiles(0), - tplJrnlFsizeSblks(0), tplWCachePgSizeSblks(0), tplWCacheNumPages(0), highestRid(0), isInit(false), - envPath(envpath), + envPath(envpath_), broker(broker_), mgmtObject(), agent(0) {} -uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, const uint16_t jrnlFsizePgs*/) +uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_) { - uint32_t p = param; + uint32_t p = param_; -/* if (jrnlFsizePgs == 1 && p > 64 ) { - p = 64; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); - } - else*/ if (p == 0) { + if (p == 0) { // For zero value, use default p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value if (p < 6) p = 4; @@ -98,16 +93,16 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const st else if (p < 48) p = 32; else if (p < 96) p = 64; else p = 128; - QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); } return p; } -uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib) +uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) { - uint32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). - switch (wrPageSizeKib) + switch (wrPageSizeKib_) { case 1: case 2: @@ -124,6 +119,29 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib) } } +qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_, + const std::string& /*paramName_*/) { + // TODO: check against list of existing partitions, throw if not found + return partition_; +} + +qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, + const std::string& paramName_) { + uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB); + if (rem != 0) { + uint64_t newVal = efpFileSizeKib_ - rem; + if (rem >= (JRNL_SBLK_SIZE_KIB / 2)) + newVal += JRNL_SBLK_SIZE_KIB; + QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " << + JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" << + newVal << ")"); + return newVal; + } + return efpFileSizeKib_; + + // TODO: check against list of existing pools in the given partition +} + void MessageStoreImpl::initManagement () { if (broker != 0) { @@ -134,15 +152,10 @@ void MessageStoreImpl::initManagement () new _qmf::Store(agent, this, broker)); mgmtObject->set_location(storeDir); - mgmtObject->set_defaultInitialFileCount(numJrnlFiles); - mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE); mgmtObject->set_tplWritePages(tplWCacheNumPages); - mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles); - mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE); - mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles); agent->addObject(mgmtObject, 0, true); @@ -154,61 +167,58 @@ void MessageStoreImpl::initManagement () } } -bool MessageStoreImpl::init(const qpid::Options* options) +bool MessageStoreImpl::init(const qpid::Options* options_) { // Extract and check options - const StoreOptions* opts = static_cast(options); + const StoreOptions* opts = static_cast(options_); + qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); + qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); // Pass option values to init() - return init(opts->storeDir, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); + return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); } // These params, taken from options, are assumed to be correct and verified -bool MessageStoreImpl::init(const std::string& dir, +bool MessageStoreImpl::init(const std::string& storeDir_, /*uint16_t jfiles, uint32_t jfileSizePgs,*/ - const bool truncateFlag, - uint32_t wCachePageSizeKib, + qpid::qls_jrnl::efpPartitionNumber_t efpPartition_, + qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, + const bool truncateFlag_, + uint32_t wCachePageSizeKib_, /*uint16_t tplJfiles, uint32_t tplJfileSizePgs,*/ - uint32_t tplWCachePageSizeKib/*, - bool autoJExpand, - uint16_t autoJExpandMaxFiles*/) + uint32_t tplWCachePageSizeKib_) + /*bool autoJExpand, + uint16_t autoJExpandMaxFiles)*/ { if (isInit) return true; // Set geometry members (converting to correct units where req'd) -// numJrnlFiles = jfiles; -// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; - wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks - wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib); -// tplNumJrnlFiles = tplJfiles; -// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; - tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks - tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib); -// autoJrnlExpand = autoJExpand; -// autoJrnlExpandMaxFiles = autoJExpandMaxFiles; - if (dir.size()>0) storeDir = dir; - - if (truncateFlag) - truncateInit(false); + defaultEfpPartitionNumber = efpPartition_; + defaultEfpFileSizeKib = efpFileSizeKib_; + wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_); + tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_); + if (storeDir_.size()>0) storeDir = storeDir_; + + if (truncateFlag_) + truncateInit(); else init(); - QLS_LOG(notice, "Store module initialized; store-dir=" << dir); -// QLS_LOG(info, "> Default files per journal: " << jfiles); -// TODO: Uncomment these lines when auto-expand is enabled. -// QLS_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); -// if (autoJrnlExpand) QLS_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); -// QLS_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); - QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); + QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_); + QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber); + QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)"); + QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); -// QLS_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); -// QLS_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); - QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); + QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); + QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber); + QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)"); return isInit; } @@ -263,7 +273,7 @@ void MessageStoreImpl::init() // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -286,6 +296,9 @@ void MessageStoreImpl::init() throw; } } while (!isInit); + + efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir())); + efpMgr->findEfpPartitions(); } void MessageStoreImpl::finalize() @@ -307,7 +320,7 @@ void MessageStoreImpl::finalize() } } -void MessageStoreImpl::truncateInit(const bool saveStoreContent) +void MessageStoreImpl::truncateInit() { if (isInit) { { @@ -324,15 +337,10 @@ void MessageStoreImpl::truncateInit(const bool saveStoreContent) dbenv->close(0); isInit = false; } - std::ostringstream oss; - oss << storeDir << "/" << storeTopLevelDir; - if (saveStoreContent) { - std::string dir = qpid::qls_jrnl::jdir::push_down(storeDir, storeTopLevelDir, "cluster"); - QLS_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); - } else { - qpid::qls_jrnl::jdir::delete_dir(oss.str().c_str()); - QLS_LOG(notice, "Store directory " << oss.str() << " was truncated."); - } + qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); + qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); + qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); + QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated."); init(); } @@ -342,18 +350,18 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { qpid::qls_jrnl::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } -void MessageStoreImpl::open(db_ptr db, - DbTxn* txn, - const char* file, - bool dupKey) +void MessageStoreImpl::open(db_ptr db_, + DbTxn* txn_, + const char* file_, + bool dupKey_) { - if(dupKey) db->set_flags(DB_DUPSORT); - db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); + if(dupKey_) db_->set_flags(DB_DUPSORT); + db_->open(txn_, file_, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); } void MessageStoreImpl::closeDbs() @@ -385,15 +393,15 @@ MessageStoreImpl::~MessageStoreImpl() } } -void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, - const qpid::framing::FieldTable& /*args*/) +void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, + const qpid::framing::FieldTable& args_) { + QLS_LOG(info, "*** MessageStoreImpl::create() queue=\"" << queue_.getName() << "\""); // DEBUG checkInit(); - if (queue.getPersistenceId()) { - THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); + if (queue_.getPersistenceId()) { + THROW_STORE_EXCEPTION("Queue already created: " + queue_.getName()); } JournalImpl* jQueue = 0; - qpid::framing::FieldTable::ValuePtr value; // uint16_t localFileCount = numJrnlFiles; // bool localAutoExpandFlag = autoJrnlExpand; @@ -408,18 +416,18 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, // if (value.get() != 0 && !value->empty() && value->convertsTo()) // localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; - if (queue.getName().size() == 0) + if (queue_.getName().size() == 0) { QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); return; } - jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"), + jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/ defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList[queue.getName()]=jQueue; + journalList[queue_.getName()]=jQueue; } // value = args.get("qpid.auto_expand"); @@ -429,25 +437,71 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, // value = args.get("qpid.auto_expand_max_jfiles"); // if (value.get() != 0 && !value->empty() && value->convertsTo()) // localAutoExpandMaxFileCount = (uint16_t) value->get(); +/* + qpid::framing::FieldTable::ValuePtr value; + qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition; + value = args_.get("qpid.efp_partition"); + if (value.get() != 0 && !value->empty() && value->convertsTo()) { + localEfpPartition = chkEfpPartition((uint32_t)value->get(), "qpid.efp_partition"); + } + + qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib; + value = args_.get("qpid.efp_file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo()) { + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get(),"qpid.efp_file_size" ); + } +*/ - queue.setExternalQueueStore(dynamic_cast(jQueue)); + queue_.setExternalQueueStore(dynamic_cast(jQueue)); try { // init will create the deque's for the init... - jQueue->initialize(/*localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks,*/ wCacheNumPages, wCachePgSizeSblks); +// jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks); + jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); } catch (const qpid::qls_jrnl::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what()); + THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); } try { - if (!create(queueDb, queueIdSequence, queue)) { - THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName()); + if (!create(queueDb, queueIdSequence, queue_)) { + THROW_STORE_EXCEPTION("Queue already exists: " + queue_.getName()); } } catch (const DbException& e) { - THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e); + THROW_STORE_EXCEPTION_2("Error creating queue named " + queue_.getName(), e); + } +} + +qpid::qls_jrnl::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_, + const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) { + qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); + if (efpp == 0) { + std::ostringstream oss; + oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB"; + throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); } + return efpp; } -void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue) +qpid::qls_jrnl::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { + qpid::framing::FieldTable::ValuePtr value; + qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; + value = args_.get("qpid.efp_partition"); + if (value.get() != 0 && !value->empty() && value->convertsTo()) { + localEfpPartition = chkEfpPartition((uint32_t)value->get(), "qpid.efp_partition"); + } + + qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib; + value = args_.get("qpid.efp_file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo()) { + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get(),"qpid.efp_file_size" ); + } + return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); +} + +void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\""); +/* checkInit(); destroy(queueDb, queue); deleteBindingsForQueue(queue); @@ -461,21 +515,22 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue) journalList.erase(queue.getName()); } } +*/ } -void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange, - const qpid::framing::FieldTable& /*args*/) +void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_, + const qpid::framing::FieldTable& /*args_*/) { checkInit(); - if (exchange.getPersistenceId()) { - THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); + if (exchange_.getPersistenceId()) { + THROW_STORE_EXCEPTION("Exchange already created: " + exchange_.getName()); } try { - if (!create(exchangeDb, exchangeIdSequence, exchange)) { - THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName()); + if (!create(exchangeDb, exchangeIdSequence, exchange_)) { + THROW_STORE_EXCEPTION("Exchange already exists: " + exchange_.getName()); } } catch (const DbException& e) { - THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e); + THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange_.getName(), e); } } @@ -488,14 +543,14 @@ void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange bindingDb->del(0, &key, DB_AUTO_COMMIT); } -void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general) +void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general_) { checkInit(); - if (general.getPersistenceId()) { + if (general_.getPersistenceId()) { THROW_STORE_EXCEPTION("General configuration item already created"); } try { - if (!create(generalDb, generalIdSequence, general)) { + if (!create(generalDb, generalIdSequence, general_)) { THROW_STORE_EXCEPTION("General configuration already exists"); } } catch (const DbException& e) { @@ -503,25 +558,25 @@ void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general) } } -void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general) +void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general_) { checkInit(); - destroy(generalDb, general); + destroy(generalDb, general_); } -bool MessageStoreImpl::create(db_ptr db, - IdSequence& seq, - const qpid::broker::Persistable& p) +bool MessageStoreImpl::create(db_ptr db_, + IdSequence& seq_, + const qpid::broker::Persistable& p_) { - uint64_t id (seq.next()); + uint64_t id (seq_.next()); Dbt key(&id, sizeof(id)); - BufferValue value (p); + BufferValue value (p_); int status; TxnCtxt txn; txn.begin(dbenv.get(), true); try { - status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE); + status = db_->put(txn.get(), &key, &value, DB_NOOVERWRITE); txn.commit(); } catch (...) { txn.abort(); @@ -530,27 +585,27 @@ bool MessageStoreImpl::create(db_ptr db, if (status == DB_KEYEXIST) { return false; } else { - p.setPersistenceId(id); + p_.setPersistenceId(id); return true; } } -void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p) +void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p_) { qpid::sys::Mutex::ScopedLock sl(bdbLock); - IdDbt key(p.getPersistenceId()); + IdDbt key(p_.getPersistenceId()); db->del(0, &key, DB_AUTO_COMMIT); } -void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e, - const qpid::broker::PersistableQueue& q, - const std::string& k, - const qpid::framing::FieldTable& a) +void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e_, + const qpid::broker::PersistableQueue& q_, + const std::string& k_, + const qpid::framing::FieldTable& a_) { checkInit(); - IdDbt key(e.getPersistenceId()); - BindingDbt value(e, q, k, a); + IdDbt key(e_.getPersistenceId()); + BindingDbt value(e_, q_, k_, a_); TxnCtxt txn; txn.begin(dbenv.get(), true); try { @@ -562,16 +617,16 @@ void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e, } } -void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e, - const qpid::broker::PersistableQueue& q, - const std::string& k, - const qpid::framing::FieldTable&) +void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e_, + const qpid::broker::PersistableQueue& q_, + const std::string& k_, + const qpid::framing::FieldTable& /*a_*/) { checkInit(); - deleteBinding(e, q, k); + deleteBinding(e_, q_, k_); } -void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) +void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) { checkInit(); txn_list prepared; @@ -585,14 +640,14 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) txn.begin(dbenv.get(), false); try { //read all queues, calls recoversMessages - recoverQueues(txn, registry, queues, prepared, messages); + recoverQueues(txn, registry_, queues, prepared, messages); //recover exchange & bindings: - recoverExchanges(txn, registry, exchanges); + recoverExchanges(txn, registry_, exchanges); recoverBindings(txn, exchanges, queues); //recover general-purpose configuration - recoverGeneral(txn, registry); + recoverGeneral(txn, registry_); txn.commit(); } catch (const DbException& e) { @@ -629,7 +684,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; - if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn); + if (!incomplTplTxnFlag) dtx = registry_.recoverTransaction(xid, txn); if (pt.enqueues.get()) { for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) { tpcc->addXidRecord(queues[j->first]->getExternalQueueStore()); @@ -669,15 +724,17 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) } } } - registry.recoveryComplete(); + registry_.recoveryComplete(); } -void MessageStoreImpl::recoverQueues(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry, - queue_index& queue_index, - txn_list& prepared, - message_index& messages) +void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, + qpid::broker::RecoveryManager& /*registry*/, + queue_index& /*queue_index*/, + txn_list& /*prepared*/, + message_index& /*messages*/) { + QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()"); +/* Cursor queues; queues.open(queueDb, txn.get()); @@ -714,24 +771,22 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count uint64_t thisHighestRid = 0ULL; - jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages, + jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery -/* - // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting - // from recovery of a store that has had its size changed externally by the resize utility. - // If so, update the queue store settings so that QMF queries will reflect the new values. - const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; - qpid::framing::FieldTable::ValuePtr value; - value = storeargs.get("qpid.file_count"); - if (value.get() != 0 && !value->empty() && value->convertsTo() && (uint16_t)value->get() != jQueue->num_jfiles()) { - queue->addArgument("qpid.file_count", jQueue->num_jfiles()); - } - value = storeargs.get("qpid.file_size"); - if (value.get() != 0 && !value->empty() && value->convertsTo() && (uint32_t)value->get() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { - queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); - } -*/ +// // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting +// // from recovery of a store that has had its size changed externally by the resize utility. +// // If so, update the queue store settings so that QMF queries will reflect the new values. +// const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; +// qpid::framing::FieldTable::ValuePtr value; +// value = storeargs.get("qpid.file_count"); +// if (value.get() != 0 && !value->empty() && value->convertsTo() && (uint16_t)value->get() != jQueue->num_jfiles()) { +// queue->addArgument("qpid.file_count", jQueue->num_jfiles()); +// } +// value = storeargs.get("qpid.file_size"); +// if (value.get() != 0 && !value->empty() && value->convertsTo() && (uint32_t)value->get() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { +// queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); +// } if (highestRid == 0ULL) highestRid = thisHighestRid; @@ -755,16 +810,17 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); queueIdSequence.reset(maxQueueId + 1); +*/ } -void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry, - exchange_index& index) +void MessageStoreImpl::recoverExchanges(TxnCtxt& txn_, + qpid::broker::RecoveryManager& registry_, + exchange_index& index_) { //TODO: this is a copy&paste from recoverQueues - refactor! Cursor exchanges; - exchanges.open(exchangeDb, txn.get()); + exchanges.open(exchangeDb, txn_.get()); uint64_t maxExchangeId(1); IdDbt key; @@ -773,11 +829,11 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, while (exchanges.next(key, value)) { qpid::framing::Buffer buffer(reinterpret_cast(value.get_data()), value.get_size()); //create a Exchange instance - qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer); + qpid::broker::RecoverableExchange::shared_ptr exchange = registry_.recoverExchange(buffer); if (exchange) { //set the persistenceId and update max as required exchange->setPersistenceId(key.id); - index[key.id] = exchange; + index_[key.id] = exchange; QLS_LOG(info, "Recovered exchange \"" << exchange->getName() << '"'); } maxExchangeId = std::max(key.id, maxExchangeId); @@ -785,12 +841,12 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, exchangeIdSequence.reset(maxExchangeId + 1); } -void MessageStoreImpl::recoverBindings(TxnCtxt& txn, - exchange_index& exchanges, - queue_index& queues) +void MessageStoreImpl::recoverBindings(TxnCtxt& txn_, + exchange_index& exchanges_, + queue_index& queues_) { Cursor bindings; - bindings.open(bindingDb, txn.get()); + bindings.open(bindingDb, txn_.get()); IdDbt key; Dbt value; @@ -807,9 +863,9 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, buffer.getShortString(queueName); buffer.getShortString(routingkey); buffer.get(args); - exchange_index::iterator exchange = exchanges.find(key.id); - queue_index::iterator queue = queues.find(queueId); - if (exchange != exchanges.end() && queue != queues.end()) { + exchange_index::iterator exchange = exchanges_.find(key.id); + queue_index::iterator queue = queues_.find(queueId); + if (exchange != exchanges_.end() && queue != queues_.end()) { //could use the recoverable queue here rather than the name... exchange->second->bind(queueName, routingkey, args); QLS_LOG(info, "Recovered binding exchange=" << exchange->second->getName() @@ -823,11 +879,11 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, } } -void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, - qpid::broker::RecoveryManager& registry) +void MessageStoreImpl::recoverGeneral(TxnCtxt& txn_, + qpid::broker::RecoveryManager& registry_) { Cursor items; - items.open(generalDb, txn.get()); + items.open(generalDb, txn_.get()); uint64_t maxGeneralId(1); IdDbt key; @@ -836,7 +892,7 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, while (items.next(key, value)) { qpid::framing::Buffer buffer(reinterpret_cast(value.get_data()), value.get_size()); //create instance - qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer); + qpid::broker::RecoverableConfig::shared_ptr config = registry_.recoverConfig(buffer); //set the persistenceId and update max as required config->setPersistenceId(key.id); maxGeneralId = std::max(key.id, maxGeneralId); @@ -845,14 +901,16 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, } void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, - qpid::broker::RecoveryManager& recovery, - qpid::broker::RecoverableQueue::shared_ptr& queue, - txn_list& prepared, - message_index& messages, - long& rcnt, - long& idcnt) + qpid::broker::RecoveryManager& /*recovery*/, + qpid::broker::RecoverableQueue::shared_ptr& queue_, + txn_list& /*prepared*/, + message_index& /*messages*/, + long& /*rcnt*/, + long& /*idcnt*/) { - size_t preambleLength = sizeof(uint32_t)/*header size*/; + QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\""); +/* + size_t preambleLength = sizeof(uint32_t)header size; JournalImpl* jc = static_cast(queue->getExternalQueueStore()); DataTokenImpl dtok; @@ -977,38 +1035,39 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); } +*/ } qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/, - uint64_t /*messageId*/, - unsigned& /*headerSize*/) + uint64_t /*messageId*/, + unsigned& /*headerSize*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); } -int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, - IdDbt& msgId, - qpid::broker::RecoverableMessage::shared_ptr& msg, - queue_index& index, - txn_list& prepared, - message_index& messages) +int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, + IdDbt& msgId_, + qpid::broker::RecoverableMessage::shared_ptr& msg_, + queue_index& index_, + txn_list& prepared_, + message_index& messages_) { Cursor mappings; - mappings.open(mappingDb, txn.get()); + mappings.open(mappingDb, txn_.get()); IdDbt value; int count(0); - for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) { - if (index.find(value.id) == index.end()) { + for (int status = mappings->get(&msgId_, &value, DB_SET); status == 0; status = mappings->get(&msgId_, &value, DB_NEXT_DUP)) { + if (index_.find(value.id) == index_.end()) { QLS_LOG(warning, "Recovered message for queue that no longer exists"); mappings->del(0); } else { - qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id]; - if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) { - messages[msgId.id] = msg; + qpid::broker::RecoverableQueue::shared_ptr queue = index_[value.id]; + if (PreparedTransaction::isLocked(prepared_, value.id, msgId_.id)) { + messages_[msgId_.id] = msg_; } else { - queue->recover(msg); + queue->recover(msg_); } count++; } @@ -1019,6 +1078,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, void MessageStoreImpl::readTplStore() { + QLS_LOG(info, "*** MessageStoreImpl::readTplStore()"); +/* tplRecoverMap.clear(); qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map(); DataTokenImpl dtok; @@ -1087,13 +1148,16 @@ void MessageStoreImpl::readTplStore() } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); } +*/ } void MessageStoreImpl::recoverTplStore() { + QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); +/* if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1104,10 +1168,13 @@ void MessageStoreImpl::recoverTplStore() tplStorePtr->recover_complete(); // start journal. } +*/ } -void MessageStoreImpl::recoverLockedMappings(txn_list& txns) +void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/) { + QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()"); +/* if (!tplStorePtr->is_ready()) recoverTplStore(); @@ -1119,10 +1186,13 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) deq_ptr.reset(new LockedMappings); txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); } +*/ } -void MessageStoreImpl::collectPreparedXids(std::set& xids) +void MessageStoreImpl::collectPreparedXids(std::set& /*xids*/) { + QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); +/* if (tplStorePtr->is_ready()) { tplStorePtr->read_reset(); readTplStore(); @@ -1134,6 +1204,7 @@ void MessageStoreImpl::collectPreparedXids(std::set& xids) if (!i->second.deq_flag && i->second.tpc_flag) xids.insert(i->first); } +*/ } void MessageStoreImpl::stage(const boost::intrusive_ptr& /*msg*/) @@ -1147,17 +1218,19 @@ void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/) } void MessageStoreImpl::appendContent(const boost::intrusive_ptr& /*msg*/, - const std::string& /*data*/) + const std::string& /*data*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); } -void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, - const boost::intrusive_ptr& msg, - std::string& data, - uint64_t offset, - uint32_t length) +void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/, + const boost::intrusive_ptr& /*msg*/, + std::string& /*data*/, + uint64_t /*offset*/, + uint32_t /*length*/) { + throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); +/* checkInit(); uint64_t messageId (msg->getPersistenceId()); @@ -1181,10 +1254,13 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, } else { THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); } +*/ } -void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\""); +/* if (queue.getExternalQueueStore() == 0) return; checkInit(); std::string qn = queue.getName(); @@ -1192,17 +1268,20 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue) JournalImpl* jc = static_cast(queue.getExternalQueueStore()); if (jc) { // TODO: check if this result should be used... - /*mrg::journal::iores res =*/ jc->flush(); + mrg::journal::iores res = jc->flush(); } } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); } +*/ } -void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr& msg_, + const qpid::broker::PersistableQueue& /*queue*/) { +// QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\""); +/* checkInit(); uint64_t queueId (queue.getPersistenceId()); uint64_t messageId (msg->getPersistenceId()); @@ -1228,29 +1307,34 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, // add queue* to the txn map.. if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); +*/ + msg_->enqueueComplete();// DEBUG: only while null fns in use } -uint64_t MessageStoreImpl::msgEncode(std::vector& buff, const boost::intrusive_ptr& message) +uint64_t MessageStoreImpl::msgEncode(std::vector& buff_, + const boost::intrusive_ptr& message_) { - uint32_t headerSize = message->encodedHeaderSize(); - uint64_t size = message->encodedSize() + sizeof(uint32_t); - try { buff = std::vector(size); } // long + headers + content + uint32_t headerSize = message_->encodedHeaderSize(); + uint64_t size = message_->encodedSize() + sizeof(uint32_t); + try { buff_ = std::vector(size); } // long + headers + content catch (const std::exception& e) { std::ostringstream oss; oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what(); THROW_STORE_EXCEPTION(oss.str()); } - qpid::framing::Buffer buffer(&buff[0],size); + qpid::framing::Buffer buffer(&buff_[0],size); buffer.putLong(headerSize); - message->encode(buffer); + message_->encode(buffer); return size; } -void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, - TxnCtxt* txn, - const boost::intrusive_ptr& message, - bool /*newId*/) +void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, + TxnCtxt* /*txn*/, + const boost::intrusive_ptr& /*message*/, + bool /*newId*/) { + QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\""); +/* std::vector buff; uint64_t size = msgEncode(buff, message); @@ -1275,12 +1359,15 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } +*/ } -void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr& msg_, + const qpid::broker::PersistableQueue& /*queue*/) { +// QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\""); +/* checkInit(); uint64_t queueId (queue.getPersistenceId()); uint64_t messageId (msg->getPersistenceId()); @@ -1302,14 +1389,16 @@ void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, // add queue* to the txn map.. if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); async_dequeue(ctxt, msg, queue); - - msg->dequeueComplete(); +*/ + msg_->dequeueComplete(); } -void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, - const boost::intrusive_ptr& msg, - const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/, + const boost::intrusive_ptr& /*msg*/, + const qpid::broker::PersistableQueue& queue_) { + QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\""); +/* boost::intrusive_ptr ddtokp(new DataTokenImpl); ddtokp->setSourceMessage(msg); ddtokp->set_external_rid(true); @@ -1334,38 +1423,39 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, ddtokp->release(); THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what()); } +*/ } -uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) +uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/) { - checkInit(); +/* checkInit();*/ return 0; } -void MessageStoreImpl::completed(TxnCtxt& txn, - bool commit) +void MessageStoreImpl::completed(TxnCtxt& txn_, + bool commit_) { try { chkTplStoreInit(); // Late initialize (if needed) // Nothing to do if not prepared - if (txn.getDtok()->is_enqueued()) { - txn.incrDtokRef(); - DataTokenImpl* dtokp = txn.getDtok(); + if (txn_.getDtok()->is_enqueued()) { + txn_.incrDtokRef(); + DataTokenImpl* dtokp = txn_.getDtok(); dtokp->set_dequeue_rid(dtokp->rid()); dtokp->set_rid(messageIdSequence.next()); - tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit); + tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_); } - txn.complete(commit); + txn_.complete(commit_); if (mgmtObject.get() != 0) { mgmtObject->dec_tplTransactionDepth(); - if (commit) + if (commit_) mgmtObject->inc_tplTxnCommits(); else mgmtObject->inc_tplTxnAborts(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what()); + QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what()); throw; } } @@ -1377,54 +1467,54 @@ std::auto_ptr MessageStoreImpl::begin() return std::auto_ptr(new TxnCtxt(&messageIdSequence)); } -std::auto_ptr MessageStoreImpl::begin(const std::string& xid) +std::auto_ptr MessageStoreImpl::begin(const std::string& xid_) { checkInit(); IdSequence* jtx = &messageIdSequence; // pass sequence number for c/a - return std::auto_ptr(new TPCTxnCtxt(xid, jtx)); + return std::auto_ptr(new TPCTxnCtxt(xid_, jtx)); } -void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt) +void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn = dynamic_cast(&ctxt); + TxnCtxt* txn = dynamic_cast(&ctxt_); if(!txn) throw qpid::broker::InvalidTransactionContextException(); localPrepare(txn); } -void MessageStoreImpl::localPrepare(TxnCtxt* ctxt) +void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) { try { chkTplStoreInit(); // Late initialize (if needed) // This sync is required to ensure multi-queue atomicity - ie all txn data // must hit the disk on *all* queues before the TPL prepare (enq) is written. - ctxt->sync(); + ctxt_->sync(); - ctxt->incrDtokRef(); - DataTokenImpl* dtokp = ctxt->getDtok(); + ctxt_->incrDtokRef(); + DataTokenImpl* dtokp = ctxt_->getDtok(); dtokp->set_external_rid(true); dtokp->set_rid(messageIdSequence.next()); - char tpcFlag = static_cast(ctxt->isTPC()); - tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false); - ctxt->prepare(tplStorePtr.get()); + char tpcFlag = static_cast(ctxt_->isTPC()); + tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false); + ctxt_->prepare(tplStorePtr.get()); // make sure all the data is written to disk before returning - ctxt->sync(); + ctxt_->sync(); if (mgmtObject.get() != 0) { mgmtObject->inc_tplTransactionDepth(); mgmtObject->inc_tplTxnPrepares(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what()); + QLS_LOG(error, "Error preparing xid " << ctxt_->getXid() << ": " << e.what()); throw; } } -void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt) +void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn(check(&ctxt)); + TxnCtxt* txn(check(&ctxt_)); if (!txn->isTPC()) { if (txn->impactedQueuesEmpty()) return; localPrepare(dynamic_cast(txn)); @@ -1432,10 +1522,10 @@ void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt) completed(*dynamic_cast(txn), true); } -void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt) +void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt_) { checkInit(); - TxnCtxt* txn(check(&ctxt)); + TxnCtxt* txn(check(&ctxt_)); if (!txn->isTPC()) { if (txn->impactedQueuesEmpty()) return; localPrepare(dynamic_cast(txn)); @@ -1443,20 +1533,20 @@ void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt) completed(*dynamic_cast(txn), false); } -TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt) +TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt_) { - TxnCtxt* txn = dynamic_cast(ctxt); + TxnCtxt* txn = dynamic_cast(ctxt_); if(!txn) throw qpid::broker::InvalidTransactionContextException(); return txn; } -void MessageStoreImpl::put(db_ptr db, - DbTxn* txn, - Dbt& key, - Dbt& value) +void MessageStoreImpl::put(db_ptr db_, + DbTxn* txn_, + Dbt& key_, + Dbt& value_) { try { - int status = db->put(txn, &key, &value, DB_NODUPDATA); + int status = db_->put(txn_, &key_, &value_, DB_NODUPDATA); if (status == DB_KEYEXIST) { THROW_STORE_EXCEPTION("duplicate data"); } else if (status) { @@ -1467,7 +1557,7 @@ void MessageStoreImpl::put(db_ptr db, } } -void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue) +void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue_) { TxnCtxt txn; txn.begin(dbenv.get(), true); @@ -1484,9 +1574,9 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue THROW_STORE_EXCEPTION("Not enough data for binding"); } uint64_t queueId = buffer.getLongLong(); - if (queue.getPersistenceId() == queueId) { + if (queue_.getPersistenceId() == queueId) { bindings->del(0); - QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId); } } } @@ -1498,12 +1588,12 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue txn.abort(); throw; } - QLS_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId()); + QLS_LOG(debug, "Deleted all bindings for " << queue_.getName() << ":" << queue_.getPersistenceId()); } -void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange, - const qpid::broker::PersistableQueue& queue, - const std::string& bkey) +void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange_, + const qpid::broker::PersistableQueue& queue_, + const std::string& bkey_) { TxnCtxt txn; txn.begin(dbenv.get(), true); @@ -1512,7 +1602,7 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex Cursor bindings; bindings.open(bindingDb, txn.get()); - IdDbt key(exchange.getPersistenceId()); + IdDbt key(exchange_.getPersistenceId()); Dbt value; for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) { @@ -1521,14 +1611,14 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex THROW_STORE_EXCEPTION("Not enough data for binding"); } uint64_t queueId = buffer.getLongLong(); - if (queue.getPersistenceId() == queueId) { + if (queue_.getPersistenceId() == queueId) { std::string q; std::string k; buffer.getShortString(q); buffer.getShortString(k); - if (bkey == k) { + if (bkey_ == k) { bindings->del(0); - QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue_.getName() << " " << key.id << "->" << queueId); } } } @@ -1543,6 +1633,13 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex } } +std::string MessageStoreImpl::getStoreTopLevelDir() { + std::ostringstream dir; + dir << storeDir << "/" << storeTopLevelDir; + return dir.str(); +} + + std::string MessageStoreImpl::getJrnlBaseDir() { std::ostringstream dir; @@ -1564,43 +1661,28 @@ std::string MessageStoreImpl::getTplBaseDir() return dir.str(); } -std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/ +std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/ { - return getJrnlHashDir(queue.getName().c_str()); -} - -uint32_t MessageStoreImpl::bHash(const std::string str) -{ - // Daniel Bernstein hash fn - uint32_t h = 0; - for (std::string::const_iterator i = str.begin(); i < str.end(); i++) - h = 33*h + *i; - return h; -} - -std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/ -{ - std::stringstream dir; - dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4); - dir << (bHash(queueName.c_str()) % 29); // Use a prime number for better distribution across dirs - dir << "/" << queueName << "/"; - return dir.str(); + /*return getJrnlHashDir(queue_.getName().c_str());*/ + std::ostringstream oss; + oss << getJrnlBaseDir() << queue_.getName(); + return oss.str(); } std::string MessageStoreImpl::getStoreDir() const { return storeDir; } -void MessageStoreImpl::journalDeleted(JournalImpl& j) { +void MessageStoreImpl::journalDeleted(JournalImpl& j_) { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList.erase(j.id()); + journalList.erase(j_.id()); } -MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : - qpid::Options(name), +MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : + qpid::Options(name_), truncateFlag(defTruncateFlag), wCachePageSizeKib(defWCachePageSize), tplWCachePageSizeKib(defTplWCachePageSize), efpPartition(defEfpPartition), - efpFileSize(defEfpFileSize) + efpFileSizeKib(defEfpFileSizeKib) { addOptions() ("store-dir", qpid::optValue(storeDir, "DIR"), @@ -1619,8 +1701,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : "Lower values decrease latency at the expense of throughput.") ("efp-partition", qpid::optValue(efpPartition, "N"), "Empty File Pool partition to use for finding empty journal files") - ("efp-file-size", qpid::optValue(efpFileSize, "N"), - "Empty File Pool file size to use for journal files") + ("efp-file-size", qpid::optValue(efpFileSizeKib, "N"), + "Empty File Pool file size in KiB to use for journal files. Must be a multiple of 4 KiB.") ; } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 5d5fa28ff8..8d88e9da97 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -26,10 +26,12 @@ #include "db-inc.h" #include "qpid/linearstore/Cursor.h" +#include "qpid/linearstore/EmptyFilePoolManagerImpl.h" #include "qpid/linearstore/IdDbt.h" #include "qpid/linearstore/IdSequence.h" #include "qpid/linearstore/JournalImpl.h" #include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" #include "qpid/linearstore/PreparedTransaction.h" #include "qpid/broker/Broker.h" #include "qpid/broker/MessageStore.h" @@ -48,6 +50,9 @@ class Timer; }} namespace qpid{ +namespace qls_jrnl { +class EmptyFilePoolManager; +} namespace linearstore{ /** @@ -67,7 +72,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem uint32_t wCachePageSizeKib; uint32_t tplWCachePageSizeKib; uint16_t efpPartition; - uint64_t efpFileSize; + uint64_t efpFileSizeKib; }; protected: @@ -98,10 +103,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem static const bool defTruncateFlag = false; static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; static const uint32_t defTplWCachePageSize = defWCachePageSize / 8; - static const uint16_t defEfpPartition = 0; - static const uint64_t defEfpFileSize = 512 * JRNL_SBLK_SIZE; - + static const uint16_t defEfpPartition = 1; + static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024; static const std::string storeTopLevelDir; + static qpid::sys::Duration defJournalGetEventsTimeout; static qpid::sys::Duration defJournalFlushTimeout; @@ -127,21 +132,18 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem IdSequence generalIdSequence; IdSequence messageIdSequence; std::string storeDir; - uint16_t numJrnlFiles; - bool autoJrnlExpand; - uint16_t autoJrnlExpandMaxFiles; - uint32_t jrnlFsizeSblks; + qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber; + qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib; bool truncateFlag; uint32_t wCachePgSizeSblks; uint16_t wCacheNumPages; - uint16_t tplNumJrnlFiles; - uint32_t tplJrnlFsizeSblks; uint32_t tplWCachePgSizeSblks; uint16_t tplWCacheNumPages; uint64_t highestRid; bool isInit; const char* envPath; qpid::broker::Broker* broker; + boost::shared_ptr efpMgr; qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; @@ -149,9 +151,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Parameter validation and calculation static uint32_t chkJrnlWrPageCacheSize(const uint32_t param, - const std::string paramName/*, - const uint16_t jrnlFsizePgs*/); - static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKib); + const std::string& paramName/*, + const uint16_t jrnlFsizePgs*/); + static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB); + static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition, + const std::string& paramName); + static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB, + const std::string& paramName); void init(); @@ -225,9 +231,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); - uint32_t bHash(const std::string str); std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ - std::string getJrnlHashDir(const std::string& queueName); + qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s); + qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args); + std::string getStoreTopLevelDir(); std::string getJrnlBaseDir(); std::string getBdbBaseDir(); std::string getTplBaseDir(); @@ -260,11 +267,13 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const qpid::Options* options); bool init(const std::string& dir, + qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition, + qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib, const bool truncateFlag = false, uint32_t wCachePageSize = defWCachePageSize, uint32_t tplWCachePageSize = defTplWCachePageSize); - void truncateInit(const bool saveStoreContent = false); + void truncateInit(); void initManagement (); diff --git a/qpid/cpp/src/qpid/linearstore/QpidLog.h b/qpid/cpp/src/qpid/linearstore/QpidLog.h new file mode 100644 index 0000000000..b03ea7ac9d --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/QpidLog.h @@ -0,0 +1,30 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LEGACYSTORE_LOG_H +#define QPID_LEGACYSTORE_LOG_H + +#include "qpid/log/Statement.h" + +#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg) +#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg) + +#endif // QPID_LEGACYSTORE_LOG_H diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index 840468d8bd..bf574d9740 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -23,7 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/DataDir.h" -#include "qpid/linearstore/Log.h" +#include "qpid/linearstore/QpidLog.h" #include "qpid/linearstore/MessageStoreImpl.h" using qpid::linearstore::MessageStoreImpl; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp new file mode 100644 index 0000000000..90e04df7ed --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePool.h" + +#include +#include +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include +#include +#include + +#include // DEBUG + +namespace qpid { +namespace qls_jrnl { + +EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_, + const EmptyFilePoolPartition* partitionPtr_) : + efpDirectory(efpDirectory_), + efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())), + partitionPtr(partitionPtr_) +{} + +EmptyFilePool::~EmptyFilePool() {} + +void +EmptyFilePool::initialize() { + //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG + std::vector dirList; + jdir::read_dir(efpDirectory, dirList, false, true, false); + for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { + size_t dotPos = i->rfind("."); + if (dotPos != std::string::npos) { + if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) { + std::string emptyFile(efpDirectory + "/" + (*i)); + if (validateEmptyFile(emptyFile)) { + pushEmptyFile(emptyFile); + } + } + } + } + //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG +} + +efpFileSizeKib_t +EmptyFilePool::fileSizeKib() const { + return efpFileSizeKib; +} + +efpFileCount_t +EmptyFilePool::numEmptyFiles() const { + slock l(emptyFileListMutex); + return efpFileCount_t(emptyFileList.size()); +} + +efpFileSizeKib_t +EmptyFilePool::cumFileSizeKib() const { + slock l(emptyFileListMutex); + return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib; +} + +efpPartitionNumber_t +EmptyFilePool::getPartitionNumber() const { + return partitionPtr->partitionNumber(); +} + +const EmptyFilePoolPartition* +EmptyFilePool::getPartition() const { + return partitionPtr; +} + +const efpIdentity_t +EmptyFilePool::getIdentity() const { + return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib); +} + +std::string +EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { + std::string emptyFileName = popEmptyFile(); + std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' + if (::rename(emptyFileName.c_str(), newFileName.c_str())) { + pushEmptyFile(emptyFileName); + std::ostringstream oss; + oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); + } + return newFileName; +} + +bool +EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) { + std::string emptyFileName(efpDirectory + srcFile->fileName()); + // TODO: reset file here + if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) { + std::ostringstream oss; + oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + pushEmptyFile(emptyFileName); + return true; +} + +// protected + +void +EmptyFilePool::pushEmptyFile(const std::string fqFileName_) { + slock l(emptyFileListMutex); + emptyFileList.push_back(fqFileName_); +} + +std::string +EmptyFilePool::popEmptyFile() { + std::string emptyFileName; + bool isEmpty = false; + { + slock l(emptyFileListMutex); + isEmpty = emptyFileList.empty(); + } + if (isEmpty) { + createEmptyFile(); + } + { + slock l(emptyFileListMutex); + emptyFileName = emptyFileList.front(); + emptyFileList.pop_front(); + } + return emptyFileName; +} + +void +EmptyFilePool::createEmptyFile() { + file_hdr_t fh; + ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(), + efpFileSizeKib); + std::string efpfn = getEfpFileName(); + std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary); + if (ofs.good()) { + ofs.write((char*)&fh, sizeof(file_hdr_t)); + uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t); + while (rem--) + ofs.put('\0'); + ofs.close(); + pushEmptyFile(efpfn); + std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << + efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; + } else { + std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG + } +} + +bool +EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const { + struct stat s; + if (::stat(emptyFileName_.c_str(), &s)) + { + std::ostringstream oss; + oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile"); + } + efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024; + if ((efpFileSizeKib_t)s.st_size != expectedSize) { + //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG + return false; + } + + std::ifstream ifs(emptyFileName_.c_str(), std::ifstream::in | std::ifstream::binary); + if (!ifs) { + //std::cout << "ERROR: File " << emptyFileName << ": Unable to open for reading" << std::endl; + return false; + } + + const uint8_t fhFileNameBuffLen = 50; + char fhFileNameBuff[fhFileNameBuffLen]; + file_hdr_t fh; + ifs.read((char*)&fh, sizeof(file_hdr_t)); + uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len; + ifs.read(fhFileNameBuff, fhFileNameLen); + std::string fhFileName(fhFileNameBuff, fhFileNameLen); + ifs.close(); + + if (fh._rhdr._magic != QLS_FILE_MAGIC || + fh._rhdr._version != QLS_JRNL_VERSION || + fh._efp_partition != partitionPtr->partitionNumber() || + fh._file_size_kib != efpFileSizeKib || + !::is_file_hdr_reset(&fh)) + { + //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl; + return false; + } + + return true; +} + +std::string +EmptyFilePool::getEfpFileName() { + uuid_t uuid; + ::uuid_generate(uuid); // NOTE: NOT THREAD SAFE + char uuid_str[37]; // 36 char uuid + trailing \0 + ::uuid_unparse(uuid, uuid_str); + std::ostringstream oss; + oss << efpDirectory << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION; + return oss.str(); +} + +// protected +// static +efpFileSizeKib_t +EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_, + const efpPartitionNumber_t partitionNumber_) { + // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. + std::string n(dirName_.substr(dirName_.rfind('/')+1)); + bool valid = true; + for (uint16_t charNum = 0; charNum < n.length(); ++charNum) { + if (charNum < n.length()-1) { + if (!::isdigit((int)n[charNum])) { + valid = false; + break; + } + } else { + valid = n[charNum] == 'k'; + } + } + efpFileSizeKib_t s = ::atol(n.c_str()); + if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) { + std::ostringstream oss; + oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'"; + throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); + } + return s; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h new file mode 100644 index 0000000000..1958bb7647 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOL_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOL_H_ + +namespace qpid { +namespace qls_jrnl { + + class EmptyFilePool; + +}} // namespace qpid::qls_jrnl + +#include +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include + +namespace qpid { +namespace qls_jrnl { +class jdir; +class JournalFile; + +class EmptyFilePool +{ +protected: + typedef std::deque emptyFileList_t; + typedef emptyFileList_t::iterator emptyFileListItr_t; + + const std::string efpDirectory; + const efpFileSizeKib_t efpFileSizeKib; + const EmptyFilePoolPartition* partitionPtr; + +private: + emptyFileList_t emptyFileList; + smutex emptyFileListMutex; + +public: + EmptyFilePool(const std::string& efpDirectory_, + const EmptyFilePoolPartition* partitionPtr_); + virtual ~EmptyFilePool(); + + void initialize(); + efpFileSizeKib_t fileSizeKib() const; + efpFileCount_t numEmptyFiles() const; + efpFileSizeKib_t cumFileSizeKib() const; + efpPartitionNumber_t getPartitionNumber() const; + const EmptyFilePoolPartition* getPartition() const; + const efpIdentity_t getIdentity() const; + + std::string takeEmptyFile(const std::string& destDirectory_); + bool returnEmptyFile(const JournalFile* srcFile_); + +protected: + void pushEmptyFile(const std::string fqFileName_); + std::string popEmptyFile(); + void createEmptyFile(); + bool validateEmptyFile(const std::string& emptyFileName_) const; + std::string getEfpFileName(); + static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_, + const efpPartitionNumber_t partitionNumber_); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOL_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp new file mode 100644 index 0000000000..1bc716f912 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePoolManager.h" + +#include +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/slock.h" +#include + +// DEBUG +//#include + +namespace qpid { +namespace qls_jrnl { + +EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath_) : + qlsStorePath(qlsStorePath_) +{} + +EmptyFilePoolManager::~EmptyFilePoolManager() { + slock l(partitionMapMutex); + for (partitionMapItr_t i = partitionMap.begin(); i != partitionMap.end(); ++i) { + delete i->second; + } + partitionMap.clear(); +} + +void +EmptyFilePoolManager::findEfpPartitions() { + //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG + std::vector dirList; + jdir::read_dir(qlsStorePath, dirList, true, false, true); + for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { + if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN + efpPartitionNumber_t pn = ::atoi(i->c_str() + 1); + std::string fullDirPath(qlsStorePath + "/" + (*i)); + EmptyFilePoolPartition* efppp = 0; + try { + efppp = new EmptyFilePoolPartition(pn, fullDirPath); + { + slock l(partitionMapMutex); + partitionMap[pn] = efppp; + } + } catch (const std::exception& e) { + if (efppp != 0) { + delete efppp; + efppp = 0; + } + //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; + } + if (efppp != 0) + efppp->findEmptyFilePools(); + } + } +} + +uint16_t +EmptyFilePoolManager::getNumEfpPartitions() const { + return partitionMap.size(); +} + +EmptyFilePoolPartition* +EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) { + partitionMapItr_t i = partitionMap.find(partitionNumber); + if (i == partitionMap.end()) + return 0; + else + return i->second; +} + +void +EmptyFilePoolManager::getEfpPartitionNumbers(std::vector& partitionNumberList, + const efpFileSizeKib_t efpFileSizeKb) const { + slock l(partitionMapMutex); + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + if (efpFileSizeKb == 0) { + partitionNumberList.push_back(i->first); + } else { + std::vector efpFileSizeList; + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + for (std::vector::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + if (*j == efpFileSizeKb) { + partitionNumberList.push_back(i->first); + break; + } + } + } + } +} + +void +EmptyFilePoolManager::getEfpPartitions(std::vector& partitionList, + const efpFileSizeKib_t efpFileSizeKb) { + slock l(partitionMapMutex); + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + if (efpFileSizeKb == 0) { + partitionList.push_back(i->second); + } else { + std::vector efpFileSizeList; + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + for (std::vector::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + if (*j == efpFileSizeKb) { + partitionList.push_back(i->second); + break; + } + } + } + } +} + +void +EmptyFilePoolManager::getEfpFileSizes(std::vector& efpFileSizeList, + const efpPartitionNumber_t efpPartitionNumber) const { + if (efpPartitionNumber == 0) { + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + } + } else { + partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber); + if (i != partitionMap.end()) { + i->second->getEmptyFilePoolSizesKb(efpFileSizeList); + } + } +} + +void +EmptyFilePoolManager::getEmptyFilePools(std::vector& emptyFilePoolList, + const efpPartitionNumber_t efpPartitionNumber) { + if (efpPartitionNumber == 0) { + for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { + i->second->getEmptyFilePools(emptyFilePoolList); + } + } else { + partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber); + if (i != partitionMap.end()) { + i->second->getEmptyFilePools(emptyFilePoolList); + } + } +} + +EmptyFilePool* +EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, + const efpFileSizeKib_t efpFileSizeKib) { + EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber); + if (efppp != 0) + return efppp->getEmptyFilePool(efpFileSizeKib); + return 0; +} + +EmptyFilePool* +EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) { + return getEmptyFilePool(efpIdentity.first, efpIdentity.second); +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h new file mode 100644 index 0000000000..e34eb2c0f3 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ + +#include +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include + +namespace qpid { +namespace qls_jrnl { + +class EmptyFilePoolManager +{ +protected: + typedef std::map partitionMap_t; + typedef partitionMap_t::iterator partitionMapItr_t; + typedef partitionMap_t::const_iterator partitionMapConstItr_t; + + std::string qlsStorePath; + partitionMap_t partitionMap; + smutex partitionMapMutex; + +public: + EmptyFilePoolManager(const std::string& qlsStorePath_); + virtual ~EmptyFilePoolManager(); + void findEfpPartitions(); + + uint16_t getNumEfpPartitions() const; + EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber); + void getEfpPartitionNumbers(std::vector& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const; + void getEfpPartitions(std::vector& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0); + + void getEfpFileSizes(std::vector& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const; + void getEmptyFilePools(std::vector& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0); + + EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb); + EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLMANAGER_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp new file mode 100644 index 0000000000..6e6bbc4abd --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" + +#include +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/slock.h" + +//#include // DEBUG + +namespace qpid { +namespace qls_jrnl { + +const std::string EmptyFilePoolPartition::efpTopLevelDir("efp"); // Sets the top-level efp dir within a partition + +EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_) : + partitionNum(partitionNum_), + partitionDir(partitionDir_) +{ + validatePartitionDir(); +} + +EmptyFilePoolPartition::~EmptyFilePoolPartition() { + slock l(efpMapMutex); + for (efpMapItr_t i = efpMap.begin(); i != efpMap.end(); ++i) { + delete i->second; + } + efpMap.clear(); +} + +void +EmptyFilePoolPartition::validatePartitionDir() { + if (!jdir::is_dir(partitionDir)) { + std::ostringstream ss; + ss << "Invalid partition directory: \'" << partitionDir << "\' is not a directory"; + throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir"); + } + // TODO: other validity checks here +} + +void +EmptyFilePoolPartition::findEmptyFilePools() { + //std::cout << "Reading " << partitionDir << std::endl; // DEBUG + std::vector dirList; + jdir::read_dir(partitionDir, dirList, true, false, false); + bool foundEfpDir = false; + for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { + if (i->compare(efpTopLevelDir) == 0) { + foundEfpDir = true; + break; + } + } + if (foundEfpDir) { + std::string efpDir(partitionDir + "/" + efpTopLevelDir); + //std::cout << "Reading " << efpDir << std::endl; // DEBUG + dirList.clear(); + jdir::read_dir(efpDir, dirList, true, false, false); + for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { + std::string efpSizeDir(efpDir + "/" + (*i)); + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(efpSizeDir, this); + { + slock l(efpMapMutex); + efpMap[efpp->fileSizeKib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + //std::cerr << "WARNING: " << e.what() << std::endl; + } + if (efpp != 0) + efpp->initialize(); + } + } +} + +efpPartitionNumber_t +EmptyFilePoolPartition::partitionNumber() const { + return partitionNum; +} + +std::string +EmptyFilePoolPartition::partitionDirectory() const { + return partitionDir; +} + +EmptyFilePool* +EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) { + efpMapItr_t i = efpMap.find(efpFileSizeKb); + if (i == efpMap.end()) + return 0; + return i->second; +} + +void +EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector& efpFileSizesKbList) const { + for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) { + efpFileSizesKbList.push_back(i->first); + } +} + +void +EmptyFilePoolPartition::getEmptyFilePools(std::vector& efpList) { + for (efpMapItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) { + efpList.push_back(i->second); + } +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h new file mode 100644 index 0000000000..2013884b38 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ + +namespace qpid { +namespace qls_jrnl { + + class EmptyFilePoolPartition; + +}} // namespace qpid::qls_jrnl + +#include "qpid/linearstore/jrnl/EmptyFilePool.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include "qpid/linearstore/jrnl/smutex.h" +#include +#include +#include + +namespace qpid { +namespace qls_jrnl { + +class EmptyFilePoolPartition +{ +public: + static const std::string efpTopLevelDir; +protected: + typedef std::map efpMap_t; + typedef efpMap_t::iterator efpMapItr_t; + typedef efpMap_t::const_iterator efpMapConstItr_t; + + const efpPartitionNumber_t partitionNum; + const std::string partitionDir; + efpMap_t efpMap; + smutex efpMapMutex; + + void validatePartitionDir(); + +public: + EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_); + virtual ~EmptyFilePoolPartition(); + void findEmptyFilePools(); + efpPartitionNumber_t partitionNumber() const; + std::string partitionDirectory() const; + + EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb); + void getEmptyFilePoolSizesKb(std::vector& efpFileSizesKbList) const; + void getEmptyFilePools(std::vector& efpList); +}; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLPARTITION_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h new file mode 100644 index 0000000000..de91bdc06a --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h @@ -0,0 +1,37 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ +#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ + +#include + +namespace qpid { +namespace qls_jrnl { + + typedef uint64_t efpFileSizeKib_t; + typedef uint32_t efpFileCount_t; + typedef uint16_t efpPartitionNumber_t; + typedef std::pair efpIdentity_t; + +}} // namespace qpid::qls_jrnl + +#endif /* QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp new file mode 100644 index 0000000000..87ecdfb7a8 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/JournalFile.h" + +namespace qpid { +namespace qls_jrnl { + +JournalFile::JournalFile(const std::string& fqFileName_) : + fqfn(fqFileName_) +{} + +JournalFile::~JournalFile() {} + +const std::string +JournalFile::directory() const { + return fqfn.substr(0, fqfn.rfind('/')); +} + +const std::string +JournalFile::fileName() const { + return fqfn.substr(fqfn.rfind('/')); +} + +const std::string +JournalFile::fqFileName() const { + return fqfn; +} + +bool +JournalFile::empty() const { + // TODO: return true if no still-enqueued records (or parts of records) exist in this file + return true; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h new file mode 100644 index 0000000000..de587d94e3 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALFILE_H_ +#define QPID_LINEARSTORE_JOURNALFILE_H_ + +#include + +namespace qpid { +namespace qls_jrnl { + +class JournalFile +{ +protected: + const std::string fqfn; +public: + JournalFile(const std::string& fqFileName_); + virtual ~JournalFile(); + + const std::string directory() const; + const std::string fileName() const; + const std::string fqFileName() const; + bool empty() const; +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALFILE_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp new file mode 100644 index 0000000000..6167a641c9 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/JournalFileController.h" + +#include +#include "qpid/linearstore/jrnl/EmptyFilePool.h" +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" + +#include // DEBUG + +namespace qpid { +namespace qls_jrnl { + +JournalFileController::JournalFileController(const std::string& dir_, + EmptyFilePool* efpp_) : + dir(dir_), + efpp(efpp_), + fileSeqCounter(0) +{ + //std::cout << "*** JournalFileController::JournalFileController() dir=" << dir << std::endl; +} + +JournalFileController::~JournalFileController() {} + +void +JournalFileController::pullEmptyFileFromEfp(const uint64_t recId_, + const uint64_t firstRecOffs_, + const std::string& queueName_) { + std::string ef = efpp->takeEmptyFile(dir); + //std::cout << "*** JournalFileController::pullEmptyFileFromEfp() qn=" << queueName_ << " ef=" << ef << std::endl; + const JournalFile* jfp = new JournalFile(ef/*efpp->takeEmptyFile(dir)*/); + initialzeFileHeader(jfp->fqFileName(), recId_, firstRecOffs_, getNextFileSeqNum(), queueName_); + { + slock l(journalFileListMutex); + journalFileList.push_back(jfp); + } +} + +void +JournalFileController::purgeFilesToEfp() { + slock l(journalFileListMutex); + while (journalFileList.front()->empty()) { + + efpp->returnEmptyFile(journalFileList.front()); + delete journalFileList.front(); + journalFileList.pop_front(); + } +} + +void +JournalFileController::finalize() {} + +void +JournalFileController::setFileSeqNum(const uint64_t fileSeqNum) { + fileSeqCounter = fileSeqNum; +} + +// protected + +std::string +JournalFileController::readFileHeader(file_hdr_t* fhdr_, + const std::string& fileName_) { + //std::cout << "*** JournalFileController::readFileHeader() fn=" << fileName_ << std::endl; + char buff[JRNL_SBLK_SIZE]; + std::ifstream ifs(fileName_.c_str(), std::ifstream::in | std::ifstream::binary); + if (ifs.good()) { + ifs.read(buff, JRNL_SBLK_SIZE); + ifs.close(); + std::memcpy(fhdr_, buff, sizeof(file_hdr_t)); + return std::string(buff + sizeof(file_hdr_t), fhdr_->_queue_name_len); + } else { + std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for reading." << std::endl; + } + return std::string(""); +} + +void +JournalFileController::writeFileHeader(const file_hdr_t* fhdr_, + const std::string& queueName_, + const std::string& fileName_) { + //std::cout << "*** JournalFileController::writeFileHeader() qn=" << queueName_ << " fn=" << fileName_ << std::endl; + std::fstream fs(fileName_.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); + if (fs.good()) { + fs.seekp(0); + fs.write((const char*)fhdr_, sizeof(file_hdr_t)); + fs.write(queueName_.data(), fhdr_->_queue_name_len); + fs.close(); + } else { + std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for writing." << std::endl; + } +} + +void +JournalFileController::resetFileHeader(const std::string& fileName_) { + //std::cout << "*** JournalFileController::resetFileHeader() fn=" << fileName_ << std::endl; + file_hdr_t fhdr; + readFileHeader(&fhdr, fileName_); + ::file_hdr_reset(&fhdr); + writeFileHeader(&fhdr, std::string(""), fileName_); +} + +void +JournalFileController::initialzeFileHeader(const std::string& fileName_, + const uint64_t recId_, + const uint64_t firstRecOffs_, + const uint64_t fileSeqNum_, + const std::string& queueName_) { + //std::cout << "*** JournalFileController::initialzeFileHeader() fn=" << fileName_ << " sn=" << fileSeqNum_ << " qn=" << queueName_ << std::endl; + file_hdr_t fhdr; + readFileHeader(&fhdr, fileName_); + ::file_hdr_init(&fhdr, 0, recId_, firstRecOffs_, fileSeqNum_, queueName_.length(), queueName_.data()); + writeFileHeader(&fhdr, queueName_, fileName_); +} + +uint64_t +JournalFileController::getNextFileSeqNum() { + return __sync_add_and_fetch(&fileSeqCounter, 1); // GCC atomic increment, not portable +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h new file mode 100644 index 0000000000..cd23afd959 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ +#define QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ + +#include +#include "qpid/linearstore/jrnl/smutex.h" + +struct file_hdr_t; +namespace qpid { +namespace qls_jrnl { +class EmptyFilePool; +class JournalFile; +//typedef struct file_hdr_t file_hdr_t; + +class JournalFileController +{ +protected: + typedef std::deque JournalFileList_t; + typedef JournalFileList_t::iterator JournalFileListItr_t; + + const std::string dir; + EmptyFilePool* efpp; + uint64_t fileSeqCounter; + JournalFileList_t journalFileList; + smutex journalFileListMutex; + +public: + JournalFileController(const std::string& dir, + EmptyFilePool* efpp); + virtual ~JournalFileController(); + + void pullEmptyFileFromEfp(const uint64_t recId, const uint64_t firstRecOffs, const std::string& queueName); + void purgeFilesToEfp(); + void finalize(); + void setFileSeqNum(const uint64_t fileSeqNum); + +protected: + std::string readFileHeader(file_hdr_t* fhdr, const std::string& fileName); + void writeFileHeader(const file_hdr_t* fhdr, const std::string& queueName, const std::string& fileName); + void resetFileHeader(const std::string& fileName); + void initialzeFileHeader(const std::string& fileName, const uint64_t recId, const uint64_t firstRecOffs, + const uint64_t fileSeqNum, const std::string& queueName); + uint64_t getNextFileSeqNum(); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp new file mode 100644 index 0000000000..22a6412c21 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "JournalLog.h" +#include + +namespace qpid { +namespace qls_jrnl { + +JournalLog::JournalLog() {} + +JournalLog::~JournalLog() {} + +void +JournalLog::log(log_level_t ll, const std::string& jid, const std::string& log_stmt) const { + log(ll, jid.c_str(), log_stmt.c_str()); +} + +void +JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const { + if (ll > LOG_ERROR) { + std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; + } else if (ll > LOG_INFO) { + std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; + } + +} + +const char* +JournalLog::log_level_str(log_level_t ll) { + switch (ll) + { + case LOG_TRACE: return "TRACE"; + case LOG_DEBUG: return "DEBUG"; + case LOG_INFO: return "INFO"; + case LOG_NOTICE: return "NOTICE"; + case LOG_WARN: return "WARN"; + case LOG_ERROR: return "ERROR"; + case LOG_CRITICAL: return "CRITICAL"; + } + return ""; +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h new file mode 100644 index 0000000000..83ba80d0b5 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNALLOG_H_ +#define QPID_LINEARSTORE_JOURNALLOG_H_ + +#include + +namespace qpid { +namespace qls_jrnl { + +class JournalLog +{ +public: + typedef enum _log_level + { + LOG_TRACE = 0, + LOG_DEBUG, + LOG_INFO, + LOG_NOTICE, + LOG_WARN, + LOG_ERROR, + LOG_CRITICAL + } log_level_t; + +protected: + JournalLog(); + virtual ~JournalLog(); + +public: + virtual void log(log_level_t level, const std::string& jid, const std::string& log_stmt) const; + virtual void log(log_level_t level, const char* jid, const char* const log_stmt) const; + static const char* log_level_str(log_level_t ll); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_JOURNALLOG_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h index fffb8c07d7..31fa4e6ba3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H -#define QPID_LEGACYSTORE_JRNL_ENUMS_H +#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H +#define QPID_LINEARSTORE_JRNL_ENUMS_H namespace qpid { @@ -64,6 +64,7 @@ namespace qls_jrnl return ""; } +/* enum _log_level { LOG_TRACE = 0, @@ -74,9 +75,9 @@ namespace qls_jrnl LOG_ERROR, LOG_CRITICAL }; - typedef _log_level log_level; + typedef _log_level log_level_t; - static inline const char* log_level_str(log_level ll) + static inline const char* log_level_str(log_level_t ll) { switch (ll) { @@ -90,8 +91,9 @@ namespace qls_jrnl } return ""; } +*/ }} -#endif // ifndef QPID_LEGACYSTORE_JRNL_ENUMS_H +#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index fc44e35331..110fee1334 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -40,7 +40,6 @@ #endif */ - /** * Rule: Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that *
@@ -51,6 +50,7 @@
 #define JRNL_DBLK_SIZE          128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
 #define JRNL_SBLK_SIZE_DBLKS    32          /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
 #define JRNL_SBLK_SIZE          JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE        /**< Disk softblock size in bytes */
+#define JRNL_SBLK_SIZE_KIB      JRNL_SBLK_SIZE / 1024 /**< Disk softblock size in KiB */
 //#define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. file_hdr)
 //#define JRNL_MAX_FILE_SIZE      4194176     ///< Max. jrnl file size in sblks (excl. file_hdr)
 //#define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
@@ -68,7 +68,7 @@
 //
 //#define JRNL_INFO_EXTENSION     "jinf"      ///< Extension for journal info files
 //#define JRNL_DATA_EXTENSION     "jdat"      ///< Extension for journal data files
-#define QLS_JRNL_FILE_EXTENSION "jdat"      /**< Extension for journal data files */
+#define QLS_JRNL_FILE_EXTENSION ".jrnl"     /**< Extension for journal data files */
 //#define RHM_JDAT_TXA_MAGIC      0x614d4852  ///< ("RHMa" in little endian) Magic for dtx abort hdrs
 #define QLS_TXA_MAGIC           0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
 //#define RHM_JDAT_TXC_MAGIC      0x634d4852  ///< ("RHMc" in little endian) Magic for dtx commit hdrs
@@ -83,6 +83,7 @@
 #define QLS_EMPTY_MAGIC         0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
 //#define RHM_JDAT_VERSION        0x01        ///< Version (of file layout)
 #define QLS_JRNL_VERSION        0x0002      /**< Version (of file layout) */
+#define QLS_JRNL_FHDRSIZESBLKS  0x0001      /**< Journal file header size in sblks (as defined by JRNL_SBLK_SIZE) */
 //#define RHM_CLEAN_CHAR          0xff        ///< Char used to clear empty space on disk
 #define QLS_CLEAN_CHAR          0xff        ///< Char used to clear empty space on disk
 //
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index 552aa92b9c..126d3f7da3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -29,9 +29,11 @@
 #include 
 #include 
 #include 
+#include 
 //#include "qpid/linearstore/jrnl/file_hdr.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
 //#include "qpid/linearstore/jrnl/jinf.h"
+#include "qpid/linearstore/jrnl/JournalFileController.h"
 #include 
 #include 
 #include 
@@ -62,15 +64,16 @@ bool jcntl::init_statics()
 
 // Functions
 
-jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
+jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/):
     _jid(jid),
-    _jdir(jdir, base_filename),
-    _base_filename(base_filename),
+    _jdir(jdir/*, base_filename*/),
+//    _base_filename(base_filename),
     _init_flag(false),
     _stop_flag(false),
     _readonly_flag(false),
-    _autostop(true),
-    _jfsize_sblks(0),
+//    _autostop(true),
+    _jfcp(0),
+//    _jfsize_sblks(0),
 //    _lpmgr(),
     _emap(),
     _tmap(),
@@ -87,11 +90,16 @@ jcntl::~jcntl()
         try { stop(true); }
         catch (const jexception& e) { std::cerr << e << std::endl; }
 //    _lpmgr.finalize();
+    if (_jfcp) {
+        _jfcp->finalize();
+        delete _jfcp;
+        _jfcp = 0;
+    }
 }
 
 void
 jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
-        const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+        const uint32_t jfsize_sblks,*/ EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
         aio_callback* const cbp)
 {
     _init_flag = false;
@@ -101,6 +109,12 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_
     _emap.clear();
     _tmap.clear();
 
+    if (_jfcp) {
+        _jfcp->finalize();
+        delete _jfcp;
+        _jfcp = 0;
+    }
+
 //    _lpmgr.finalize();
 
     // Set new file geometry parameters
@@ -117,6 +131,8 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_
     _jdir.clear_dir();
 //    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl);
 
+    _jfcp = new JournalFileController(_jdir.dirname(), efpp);
+    _jfcp->pullEmptyFileFromEfp(1, 4096, _jid);
 //    _wrfc.initialize(_jfsize_sblks);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(0);
@@ -159,7 +175,7 @@ jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max
     highest_rid = _rcvdat._h_rid;
     if (_rcvdat._jfull)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
-    this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
+    this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid));
 
 //    _lpmgr.recover(_rcvdat, this, &new_fcntl);
 
@@ -192,6 +208,7 @@ void
 jcntl::delete_jrnl_files()
 {
     stop(true); // wait for AIO to complete
+    _jfcp->purgeFilesToEfp();
     _jdir.delete_dir();
 }
 
@@ -407,20 +424,22 @@ jcntl::flush(const bool block_till_aio_cmpl)
     return res;
 }
 
+/*
 void
-jcntl::log(log_level ll, const std::string& log_stmt) const
+jcntl::log(log_level_t ll, const std::string& log_stmt) const
 {
     log(ll, log_stmt.c_str());
 }
 
 void
-jcntl::log(log_level ll, const char* const log_stmt) const
+jcntl::log(log_level_t ll, const char* const log_stmt) const
 {
     if (ll > LOG_INFO)
     {
         std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
     }
 }
+*/
 
 /*
 void
@@ -523,7 +542,7 @@ jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
             {
                 std::ostringstream oss;
                 oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
-                this->log(LOG_CRITICAL, oss.str());
+                this->log(LOG_CRITICAL, _jid, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
         }
@@ -877,8 +896,8 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r
     if (!ifsp->is_open())
     {
         std::ostringstream oss;
-        oss << _jdir.dirname() << "/" << _base_filename << ".";
-        oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION;
+        oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name
+        oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
         ifsp->clear(); // clear eof flag, req'd for older versions of c++
         ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary);
         if (!ifsp->good())
@@ -946,12 +965,12 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
             oss << std::hex << "Bad record alignment found at fid=0x" << fid;
             oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
             oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
-            this->log(LOG_WARN, oss.str());
+            this->log(LOG_WARN, _jid, oss.str());
         }
         const uint32_t xmagic = QLS_EMPTY_MAGIC;
         std::ostringstream oss;
-        oss << _jdir.dirname() << "/" << _base_filename << ".";
-        oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << QLS_JRNL_FILE_EXTENSION;
+        oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO linear journal name
+        oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
         std::ofstream ofsp(oss.str().c_str(),
                 std::ios_base::in | std::ios_base::out | std::ios_base::binary);
         if (!ofsp.good())
@@ -971,7 +990,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
             assert(!ofsp.fail());
             std::ostringstream oss;
             oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
-            this->log(LOG_NOTICE, oss.str());
+            this->log(LOG_NOTICE, _jid, oss.str());
             file_pos = ofsp.tellp();
         }
         ofsp.close();
@@ -979,7 +998,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
         rd._lfid = fid;
 //        if (!rd._frot)
 //            rd._ffid = (fid + 1) % rd._njf;
-        this->log(LOG_INFO, "Bad record alignment fixed.");
+        this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
     }
     rd._eo = file_pos;
 }
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
index debcc02efc..8a2c178f67 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
@@ -31,6 +31,7 @@ namespace qls_jrnl
 
 #include 
 #include 
+#include 
 #include "qpid/linearstore/jrnl/jdir.h"
 //#include "qpid/linearstore/jrnl/fcntl.h"
 //#include "qpid/linearstore/jrnl/lpmgr.h"
@@ -45,6 +46,8 @@ namespace qpid
 {
 namespace qls_jrnl
 {
+class EmptyFilePool;
+class JournalFileController;
 
     /**
     * \brief Access and control interface for the journal. This is the top-level class for the
@@ -56,7 +59,7 @@ namespace qls_jrnl
     * which is used per data block written to the journal, and is used to track its status through
     * the AIO enqueue, read and dequeue process.
     */
-    class jcntl
+    class jcntl : public JournalLog
     {
     protected:
         /**
@@ -85,7 +88,7 @@ namespace qls_jrnl
         * that will be written to disk. No file separator characters should be included here, but
         * all other legal filename characters are valid.
         */
-        std::string _base_filename;
+//        std::string _base_filename;
 
         /**
         * \brief Initialized flag
@@ -121,10 +124,11 @@ namespace qls_jrnl
         *     marker. If not set, then attempts to write will throw exceptions until the journal
         *     file low water marker moves to the next journal file.
         */
-        bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
+        //bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
 
         // Journal control structures
-        uint32_t _jfsize_sblks;    ///< Journal file size in sblks
+        JournalFileController* _jfcp;///< Journal File Controller
+        //uint32_t _jfsize_sblks;    ///< Journal file size in sblks
         //lpmgr _lpmgr;               ///< LFID-PFID manager tracks inserted journal files
         enq_map _emap;              ///< Enqueue map for low water mark management
         txn_map _tmap;              ///< Transaction map open transactions
@@ -148,7 +152,7 @@ namespace qls_jrnl
         * \param jdir The directory which will contain the journal files.
         * \param base_filename The string which will be used to start all journal filenames.
         */
-        jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename);
+        jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/);
 
         /**
         * \brief Destructor.
@@ -190,7 +194,7 @@ namespace qls_jrnl
         * \exception TODO
         */
         void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
-                const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+                const uint32_t jfsize_sblks,*/EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
                 aio_callback* const cbp);
 
         /**
@@ -628,17 +632,17 @@ namespace qls_jrnl
         * the same directory, their base filenames MUST be different or else the instances
         * will overwrite one another.
         */
-        inline const std::string& base_filename() const { return _base_filename; }
+//        inline const std::string& base_filename() const { return _base_filename; }
 
 //        inline uint16_t num_jfiles() const; { return _lpmgr.num_jfiles(); }
 
 //        inline fcntl* get_fcntlp(const uint16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }
 
-        inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
+//        inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
 
         // Logging
-        virtual void log(log_level level, const std::string& log_stmt) const;
-        virtual void log(log_level level, const char* const log_stmt) const;
+//        virtual void log(log_level_t level, const std::string& log_stmt) const;
+//        virtual void log(log_level_t level, const char* const log_stmt) const;
 
         // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
         //void chk_wr_frot();
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
index a191ed86d9..89bce926e0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
@@ -37,9 +37,9 @@ namespace qpid
 namespace qls_jrnl
 {
 
-jdir::jdir(const std::string& dirname, const std::string& _base_filename):
-        _dirname(dirname),
-        _base_filename(_base_filename)
+jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/):
+        _dirname(dirname)/*,
+        _base_filename(_base_filename)*/
 {}
 
 jdir::~jdir()
@@ -88,21 +88,22 @@ jdir::create_dir(const std::string& dirname)
 void
 jdir::clear_dir(const bool create_flag)
 {
-    clear_dir(_dirname, _base_filename, create_flag);
+    clear_dir(_dirname/*, _base_filename*/, create_flag);
 }
 
 void
-jdir::clear_dir(const char* dirname, const char* base_filename, const bool create_flag)
+jdir::clear_dir(const char* dirname/*, const char* base_filename*/, const bool create_flag)
 {
-    clear_dir(std::string(dirname), std::string(base_filename), create_flag);
+    clear_dir(std::string(dirname)/*, std::string(base_filename)*/, create_flag);
 }
 
 
 void
-jdir::clear_dir(const std::string& dirname, const std::string&
+jdir::clear_dir(const std::string& dirname/*, const std::string&
 #ifndef RHM_JOWRITE
         base_filename
 #endif
+*/
         , const bool create_flag)
 {
     DIR* dir = ::opendir(dirname.c_str());
@@ -117,7 +118,7 @@ jdir::clear_dir(const std::string& dirname, const std::string&
         oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir");
     }
-#ifndef RHM_JOWRITE
+//#ifndef RHM_JOWRITE
     struct dirent* entry;
     bool found = false;
     std::string bak_dir;
@@ -126,13 +127,13 @@ jdir::clear_dir(const std::string& dirname, const std::string&
         // Ignore . and ..
         if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
         {
-            if (std::strlen(entry->d_name) > base_filename.size())
+            if (std::strlen(entry->d_name) >= 3) // 'bak'
             {
-                if (std::strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0)
+                if (std::strncmp(entry->d_name, "bak", 3) == 0)
                 {
                     if (!found)
                     {
-                        bak_dir = create_bak_dir(dirname, base_filename);
+                        bak_dir = create_bak_dir(dirname/*, base_filename*/);
                         found = true;
                     }
                     std::ostringstream oldname;
@@ -154,16 +155,16 @@ jdir::clear_dir(const std::string& dirname, const std::string&
 // FIXME: Find out why this fails with false alarms/errors from time to time...
 // While commented out, there is no error capture from reading dir entries.
 //    check_err(errno, dir, dirname, "clear_dir");
-#endif
+//#endif
     close_dir(dir, dirname, "clear_dir");
 }
 
 // === push_down ===
 
 std::string
-jdir::push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base)
+jdir::push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/)
 {
-    std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base);
+    std::string bak_dir_name = create_bak_dir(dirname/*, bak_dir_base*/);
 
     DIR* dir = ::opendir(dirname.c_str());
     if (!dir)
@@ -202,18 +203,18 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
 void
 jdir::verify_dir()
 {
-    verify_dir(_dirname, _base_filename);
+    verify_dir(_dirname/*, _base_filename*/);
 }
 
 void
-jdir::verify_dir(const char* dirname, const char* base_filename)
+jdir::verify_dir(const char* dirname/*, const char* base_filename*/)
 {
-    verify_dir(std::string(dirname), std::string(base_filename));
+    verify_dir(std::string(dirname)/*, std::string(base_filename)*/);
 }
 
 
 void
-jdir::verify_dir(const std::string& dirname, const std::string& /*base_filename*/)
+jdir::verify_dir(const std::string& dirname/*, const std::string& base_filename*/)
 {
     if (!is_dir(dirname))
     {
@@ -323,7 +324,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
 
 
 std::string
-jdir::create_bak_dir(const std::string& dirname, const std::string& base_filename)
+jdir::create_bak_dir(const std::string& dirname)
 {
     DIR* dir = ::opendir(dirname.c_str());
     long dir_num = 0L;
@@ -339,13 +340,11 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam
         // Ignore . and ..
         if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
         {
-            if (std::strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX
+            if (std::strlen(entry->d_name) == 9) // Format: _bak.XXXX
             {
-                std::ostringstream oss;
-                oss << "_" << base_filename << ".bak.";
-                if (std::strncmp(entry->d_name, oss.str().c_str(), base_filename.size() + 6) == 0)
+                if (std::strncmp(entry->d_name, "_bak.", 5) == 0)
                 {
-                    long this_dir_num = std::strtol(entry->d_name + base_filename.size() + 6, 0, 16);
+                    long this_dir_num = std::strtol(entry->d_name + 5, 0, 16);
                     if (this_dir_num > dir_num)
                         dir_num = this_dir_num;
                 }
@@ -358,8 +357,7 @@ jdir::create_bak_dir(const std::string& dirname, const std::string& base_filenam
     close_dir(dir, dirname, "create_bak_dir");
 
     std::ostringstream dn;
-    dn << dirname << "/_" << base_filename << ".bak." << std::hex << std::setw(4) <<
-            std::setfill('0') << ++dir_num;
+    dn << dirname << "/_bak." << std::hex << std::setw(4) << std::setfill('0') << ++dir_num;
     if (::mkdir(dn.str().c_str(), S_IRWXU | S_IRWXG | S_IROTH))
     {
         std::ostringstream oss;
@@ -410,6 +408,32 @@ jdir::exists(const std::string& name)
     return exists(name.c_str());
 }
 
+void
+jdir::read_dir(const std::string& name, std::vector& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) {
+    struct stat s;
+    if (is_dir(name)) {
+        DIR* dir = ::opendir(name.c_str());
+        if (dir != 0) {
+            struct dirent* entry;
+            while ((entry = ::readdir(dir)) != 0) {
+                if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and ..
+                    std::string full_name(name + "/" + entry->d_name);
+                    if (::stat(full_name.c_str(), &s))
+                    {
+                        ::closedir(dir);
+                        std::ostringstream oss;
+                        oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
+                        throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
+                    }
+                    if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links))
+                        dir_list.push_back(entry->d_name);
+                }
+            }
+        }
+        close_dir(dir, name, "read_dir");
+    }
+}
+
 void
 jdir::check_err(const int err_num, DIR* dir, const std::string& dir_name, const std::string& fn_name)
 {
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
index af4797f44c..53519b1f2c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
@@ -19,8 +19,8 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_JDIR_H
-#define QPID_LEGACYSTORE_JRNL_JDIR_H
+#ifndef QPID_LINEARSTORE_JRNL_JDIR_H
+#define QPID_LINEARSTORE_JRNL_JDIR_H
 
 namespace qpid
 {
@@ -32,6 +32,7 @@ class jdir;
 //#include "qpid/linearstore/jrnl/jinf.h"
 #include 
 #include 
+#include 
 
 namespace qpid
 {
@@ -46,7 +47,7 @@ namespace qls_jrnl
     {
     private:
         std::string _dirname;
-        std::string _base_filename;
+        //std::string _base_filename;
 
     public:
 
@@ -57,7 +58,7 @@ namespace qls_jrnl
         * \param base_filename Filename root used in the creation of %journal files
         *     and sub-directories.
         */
-        jdir(const std::string& dirname, const std::string& base_filename);
+        jdir(const std::string& dirname/*, const std::string& base_filename*/);
 
         virtual ~jdir();
 
@@ -119,7 +120,7 @@ namespace qls_jrnl
         *     directory failed.
         * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
         */
-        static void clear_dir(const char* dirname, const char* base_filename,
+        static void clear_dir(const char* dirname/*, const char* base_filename*/,
                 const bool create_flag = true);
 
         /**
@@ -137,7 +138,7 @@ namespace qls_jrnl
         *     directory failed.
         * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
         */
-        static void clear_dir(const std::string& dirname, const std::string& base_filename,
+        static void clear_dir(const std::string& dirname/*, const std::string& base_filename*/,
                 const bool create_flag = true);
 
 
@@ -152,7 +153,7 @@ namespace qls_jrnl
          * \param bak_dir_base Base name for backup directory to be created in dirname, into which target_dir will be moved.
          * \return Name of backup dir into which target_dir was pushed.
          */
-        static std::string push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base);
+        static std::string push_down(const std::string& dirname, const std::string& target_dir/*, const std::string& bak_dir_base*/);
 
 
         /**
@@ -184,7 +185,7 @@ namespace qls_jrnl
         * \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file
         * \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing
         */
-        static void verify_dir(const char* dirname, const char* base_filename);
+        static void verify_dir(const char* dirname/*, const char* base_filename*/);
 
         /**
         * \brief Verify that dirname is a valid %journal directory.
@@ -201,7 +202,7 @@ namespace qls_jrnl
         * \exception jerrno::JERR_JINF_CVALIDFAIL Error validating %jinf file
         * \exception jerrno::JERR_JDIR_NOSUCHFILE Expected jdat file is missing
         */
-        static void verify_dir(const std::string& dirname, const std::string& base_filename);
+        static void verify_dir(const std::string& dirname/*, const std::string& base_filename*/);
 
         /**
         * \brief Delete the %journal directory and all files and sub--directories that it may
@@ -273,8 +274,8 @@ namespace qls_jrnl
         * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
         * \exception jerrno::JERR_JDIR_MKDIR The backup directory could not be deleted.
         */
-        static std::string create_bak_dir(const std::string& dirname,
-                const std::string& base_filename);
+        static std::string create_bak_dir(const std::string& dirname/*,
+                const std::string& base_filename*/);
 
         /**
         * \brief Return the directory name as a string.
@@ -284,7 +285,7 @@ namespace qls_jrnl
         /**
         * \brief Return the %journal base filename name as a string.
         */
-        inline const std::string& base_filename() const { return _base_filename; }
+//        inline const std::string& base_filename() const { return _base_filename; }
 
         /**
         * \brief Test whether the named file is a directory.
@@ -335,6 +336,8 @@ namespace qls_jrnl
         */
         static bool exists(const std::string& name);
 
+        static void read_dir(const std::string& name, std::vector& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links);
+
         /**
         * \brief Stream operator
         */
@@ -363,4 +366,4 @@ namespace qls_jrnl
 
 }}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JDIR_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
index 718315a638..ecd5469d8b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
@@ -31,39 +31,39 @@ std::map::iterator jerrno::_err_map_itr;
 bool jerrno::_initialized = jerrno::__init();
 
 // generic errors
-const uint32_t jerrno::JERR__MALLOC            = 0x0100;
-const uint32_t jerrno::JERR__UNDERFLOW         = 0x0101;
-const uint32_t jerrno::JERR__NINIT             = 0x0102;
-const uint32_t jerrno::JERR__AIO               = 0x0103;
-const uint32_t jerrno::JERR__FILEIO            = 0x0104;
-const uint32_t jerrno::JERR__RTCLOCK           = 0x0105;
-const uint32_t jerrno::JERR__PTHREAD           = 0x0106;
-const uint32_t jerrno::JERR__TIMEOUT           = 0x0107;
-const uint32_t jerrno::JERR__UNEXPRESPONSE     = 0x0108;
-const uint32_t jerrno::JERR__RECNFOUND         = 0x0109;
-const uint32_t jerrno::JERR__NOTIMPL           = 0x010a;
+const uint32_t jerrno::JERR__MALLOC              = 0x0100;
+const uint32_t jerrno::JERR__UNDERFLOW           = 0x0101;
+const uint32_t jerrno::JERR__NINIT               = 0x0102;
+const uint32_t jerrno::JERR__AIO                 = 0x0103;
+const uint32_t jerrno::JERR__FILEIO              = 0x0104;
+const uint32_t jerrno::JERR__RTCLOCK             = 0x0105;
+const uint32_t jerrno::JERR__PTHREAD             = 0x0106;
+const uint32_t jerrno::JERR__TIMEOUT             = 0x0107;
+const uint32_t jerrno::JERR__UNEXPRESPONSE       = 0x0108;
+const uint32_t jerrno::JERR__RECNFOUND           = 0x0109;
+const uint32_t jerrno::JERR__NOTIMPL             = 0x010a;
 
 // class jcntl
-const uint32_t jerrno::JERR_JCNTL_STOPPED      = 0x0200;
-const uint32_t jerrno::JERR_JCNTL_READONLY     = 0x0201;
-const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT  = 0x0202;
-const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
-const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH  = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_STOPPED        = 0x0200;
+const uint32_t jerrno::JERR_JCNTL_READONLY       = 0x0201;
+const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT    = 0x0202;
+const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC   = 0x0203;
+const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED   = 0x0204;
+const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL   = 0x0205;
+const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH    = 0x0206;
 
 // class jdir
-const uint32_t jerrno::JERR_JDIR_NOTDIR        = 0x0300;
-const uint32_t jerrno::JERR_JDIR_MKDIR         = 0x0301;
-const uint32_t jerrno::JERR_JDIR_OPENDIR       = 0x0302;
-const uint32_t jerrno::JERR_JDIR_READDIR       = 0x0303;
-const uint32_t jerrno::JERR_JDIR_CLOSEDIR      = 0x0304;
-const uint32_t jerrno::JERR_JDIR_RMDIR         = 0x0305;
-const uint32_t jerrno::JERR_JDIR_NOSUCHFILE    = 0x0306;
-const uint32_t jerrno::JERR_JDIR_FMOVE         = 0x0307;
-const uint32_t jerrno::JERR_JDIR_STAT          = 0x0308;
-const uint32_t jerrno::JERR_JDIR_UNLINK        = 0x0309;
-const uint32_t jerrno::JERR_JDIR_BADFTYPE      = 0x030a;
+const uint32_t jerrno::JERR_JDIR_NOTDIR          = 0x0300;
+const uint32_t jerrno::JERR_JDIR_MKDIR           = 0x0301;
+const uint32_t jerrno::JERR_JDIR_OPENDIR         = 0x0302;
+const uint32_t jerrno::JERR_JDIR_READDIR         = 0x0303;
+const uint32_t jerrno::JERR_JDIR_CLOSEDIR        = 0x0304;
+const uint32_t jerrno::JERR_JDIR_RMDIR           = 0x0305;
+const uint32_t jerrno::JERR_JDIR_NOSUCHFILE      = 0x0306;
+const uint32_t jerrno::JERR_JDIR_FMOVE           = 0x0307;
+const uint32_t jerrno::JERR_JDIR_STAT            = 0x0308;
+const uint32_t jerrno::JERR_JDIR_UNLINK          = 0x0309;
+const uint32_t jerrno::JERR_JDIR_BADFTYPE        = 0x030a;
 
 // class fcntl
 //const uint32_t jerrno::JERR_FCNTL_OPENWR       = 0x0400;
@@ -82,31 +82,31 @@ const uint32_t jerrno::JERR_JDIR_BADFTYPE      = 0x030a;
 //const uint32_t jerrno::JERR_RRFC_OPENRD        = 0x0600;
 
 // class jrec, enq_rec, deq_rec, txn_rec
-const uint32_t jerrno::JERR_JREC_BADRECHDR     = 0x0700;
-const uint32_t jerrno::JERR_JREC_BADRECTAIL    = 0x0701;
+const uint32_t jerrno::JERR_JREC_BADRECHDR       = 0x0700;
+const uint32_t jerrno::JERR_JREC_BADRECTAIL      = 0x0701;
 
 // class wmgr
-const uint32_t jerrno::JERR_WMGR_BADPGSTATE    = 0x0801;
-const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE  = 0x0802;
-const uint32_t jerrno::JERR_WMGR_ENQDISCONT    = 0x0803;
-const uint32_t jerrno::JERR_WMGR_DEQDISCONT    = 0x0804;
-const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ  = 0x0805;
+const uint32_t jerrno::JERR_WMGR_BADPGSTATE      = 0x0801;
+const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE    = 0x0802;
+const uint32_t jerrno::JERR_WMGR_ENQDISCONT      = 0x0803;
+const uint32_t jerrno::JERR_WMGR_DEQDISCONT      = 0x0804;
+const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ    = 0x0805;
 
 // class rmgr
-const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC  = 0x0900;
-const uint32_t jerrno::JERR_RMGR_RIDMISMATCH   = 0x0901;
+const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC    = 0x0900;
+const uint32_t jerrno::JERR_RMGR_RIDMISMATCH     = 0x0901;
 //const uint32_t jerrno::JERR_RMGR_FIDMISMATCH   = 0x0902;
-const uint32_t jerrno::JERR_RMGR_ENQSTATE      = 0x0903;
-const uint32_t jerrno::JERR_RMGR_BADRECTYPE    = 0x0904;
+const uint32_t jerrno::JERR_RMGR_ENQSTATE        = 0x0903;
+const uint32_t jerrno::JERR_RMGR_BADRECTYPE      = 0x0904;
 
 // class data_tok
-const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE  = 0x0a00;
+const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE    = 0x0a00;
 // const uint32_t jerrno::JERR_DTOK_RIDNOTSET     = 0x0a01;
 
 // class enq_map, txn_map
-const uint32_t jerrno::JERR_MAP_DUPLICATE      = 0x0b00;
-const uint32_t jerrno::JERR_MAP_NOTFOUND       = 0x0b01;
-const uint32_t jerrno::JERR_MAP_LOCKED         = 0x0b02;
+const uint32_t jerrno::JERR_MAP_DUPLICATE        = 0x0b00;
+const uint32_t jerrno::JERR_MAP_NOTFOUND         = 0x0b01;
+const uint32_t jerrno::JERR_MAP_LOCKED           = 0x0b02;
 
 // class jinf
 //const uint32_t jerrno::JERR_JINF_CVALIDFAIL    = 0x0c00;
@@ -121,6 +121,13 @@ const uint32_t jerrno::JERR_MAP_LOCKED         = 0x0b02;
 //const uint32_t jerrno::JERR_JINF_OWIBAD        = 0x0c09;
 //const uint32_t jerrno::JERR_JINF_ZEROLENFILE   = 0x0c0a;
 
+// EFP errors
+const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01;
+const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR  = 0x0d02;
+const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME    = 0x0d03;
+const uint32_t jerrno::JERR_EFP_NOEFP            = 0x0d04;
+const uint32_t jerrno::JERR_EFP_EMPTY        = 0x0d05;
+
 // Negative returns for some functions
 const int32_t jerrno::AIO_TIMEOUT               = -1;
 const int32_t jerrno::LOCK_TAKEN                = -2;
@@ -222,6 +229,13 @@ jerrno::__init()
 //    _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal";
 //    _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length";
 
+    // EFP errors
+    _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)";
+    _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)";
+    _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
+    _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
+    _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+
     //_err_map[] = "";
 
     return true;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
index 19b9954c93..2ebabd84b8 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
@@ -140,6 +140,13 @@ namespace qls_jrnl
 //        static const uint32_t JERR_JINF_OWIBAD;        ///< OWI inconsistent (>1 transition in non-ae journal)
 //        static const uint32_t JERR_JINF_ZEROLENFILE;   ///< Journal info file is zero length (empty).
 
+        // EFP errors
+        static const uint32_t JERR_EFP_BADPARTITIONNAME;  ///< Partition name invalid or of value 0
+        static const uint32_t JERR_EFP_BADEFPDIRNAME;     ///< Empty File Pool directory name invalid
+        static const uint32_t JERR_EFP_BADPARTITIONDIR;   ///< Invalid partition directory
+        static const uint32_t JERR_EFP_NOEFP;             ///< No EFP found for given partition and file size
+        static const uint32_t JERR_EFP_EMPTY;             ///< EFP empty
+
         // Negative returns for some functions
         static const int32_t AIO_TIMEOUT;               ///< Timeout waiting for AIO return
         static const int32_t LOCK_TAKEN;                ///< Attempted to take lock, but it was taken by another thread
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
index b6d0f0f2ad..9722b78e81 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
@@ -61,7 +61,7 @@ rmgr::initialize(aio_callback* const cbp)
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
     }
     _fhdr_aio_cb_ptr = new aio_cb;
-    std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+    std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb));
 }
 
 void
@@ -638,6 +638,7 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit
 void
 rmgr::rotate_page()
 {
+/*
     _page_cb_arr[_pg_index]._rdblks = 0;
     _page_cb_arr[_pg_index]._state = UNUSED;
     if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS)
@@ -654,6 +655,7 @@ rmgr::rotate_page()
     // Need to move reset into if (_rrfc.file_rotate()) above.
     if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
         _pg_cntr = 0;
+*/
 }
 
 uint32_t
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
index e8263ea281..e8ebe9db94 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
@@ -20,28 +20,69 @@
  */
 
 #include "file_hdr.h"
+#include 
 
-int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
-                   const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size,
-                   const uint64_t file_number) {
-    rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+                    const uint16_t efp_partition, const uint64_t file_size) {
+    rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+    dest->_fhdr_size_sblks = fhdr_size_sblks;
+    dest->_efp_partition = efp_partition;
+    dest->_reserved = 0;
+    dest->_file_size_kib = file_size;
+    dest->_fro = 0;
+    dest->_ts_nsec = 0;
+    dest->_ts_sec = 0;
+    dest->_file_number = 0;
+    dest->_queue_name_len = 0;
+}
+
+int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+                  const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+    dest->_rhdr._uflag = uflag;
+    dest->_rhdr._rid = rid;
     dest->_fro = fro;
-    dest->_file_count = file_count;
-    dest->_file_size = file_size;
     dest->_file_number = file_number;
+    if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
+        dest->_queue_name_len = queue_name_len;
+    } else {
+        dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+    }
+    dest->_queue_name_len = queue_name_len;
+    memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len);
     return set_time_now(dest);
 }
 
 void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
     rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+    dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
+    dest->_efp_partition = src->_efp_partition;     // Should this be copied?
+    dest->_file_size_kib = src->_file_size_kib;
     dest->_fro = src->_fro;
     dest->_ts_sec = src->_ts_sec;
     dest->_ts_nsec = src->_ts_nsec;
-    dest->_file_count = src->_file_count;
-    dest->_file_size = src->_file_size;
     dest->_file_number = src->_file_number;
 }
 
+void file_hdr_reset(file_hdr_t* target) {
+    target->_rhdr._uflag = 0;
+    target->_rhdr._rid = 0;
+    target->_fro = 0;
+    target->_ts_sec = 0;
+    target->_ts_nsec = 0;
+    target->_file_number = 0;
+    target->_queue_name_len = 0;
+    memset(target + sizeof(file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(file_hdr_t));
+}
+
+int is_file_hdr_reset(file_hdr_t* target) {
+    return target->_rhdr._uflag == 0 &&
+           target->_rhdr._rid == 0 &&
+           target->_ts_sec == 0 &&
+           target->_ts_nsec == 0 &&
+           target->_file_number == 0 &&
+           target->_queue_name_len == 0;
+}
+
 int set_time_now(file_hdr_t *fh)
 {
     struct timespec ts;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
index 304abf0700..6a2cae4ba4 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
@@ -28,6 +28,8 @@
 extern "C"{
 #endif
 
+#define MAX_FILE_HDR_LEN 4096 // Set to 1 sblk
+
 #pragma pack(1)
 
 /**
@@ -47,44 +49,50 @@ extern "C"{
  * +---+---+---+---+---+---+---+---+   | struct rec_hdr_t
  * |       first rid in file       |   |
  * +---+---+---+---+---+---+---+---+  -+
+ * |  fs   | partn |   reserved    |
+ * +---+---+---+---+---+---+---+---+
+ * |           file-size           |
+ * +---+---+---+---+---+---+---+---+
  * |              fro              |
  * +---+---+---+---+---+---+---+---+
  * |           timestamp (sec)     |
  * +---+---+---+---+---+---+---+---+
  * |           timestamp (ns)      |
  * +---+---+---+---+---+---+---+---+
- * |  file-count   |    reserved   |
- * +---+---+---+---+---+---+---+---+
- * |           file-size           |
- * +---+---+---+---+---+---+---+---+
  * |          file-number          |
  * +---+---+---+---+---+---+---+---+
- * | n-len |  Queue Name...        |
+ * |  qnl  | Queue Name...         |
  * +-------+                       |
  * |                               |
  * +---+---+---+---+---+---+---+---+
  *
- * ver = file version (If the format or encoding of this file changes, then this
- *       number should be incremented)
+ * ver = Journal version
+ * rid = Record ID
+ * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * partn = EFP partition from which this file came
  * fro = First Record Offset
- * n-len = Length of the queue name in octets.
+ * qnl = Length of the queue name in octets.
  * 
*/ typedef struct file_hdr_t { - rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */ - uint64_t _fro; /**< First Record Offset (FRO) */ - uint64_t _ts_sec; /**< Time stamp (seconds part) */ - uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ - uint32_t _file_count; /**< Total number of files in linear sequence at time of writing this file */ + rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */ + uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */ + uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */ uint32_t _reserved; - uint64_t _file_size; /**< Size of this file in octets, including header sblk */ - uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */ - uint16_t _name_length; /**< Length of the queue name in octets, which follows this struct in the header */ + uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */ + uint64_t _fro; /**< First Record Offset (FRO) */ + uint64_t _ts_sec; /**< Time stamp (seconds part) */ + uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ + uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */ + uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */ } file_hdr_t; -int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size, - const uint64_t file_number); +void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, + const uint16_t efp_partition, const uint64_t file_size); +int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro, + const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name); +void file_hdr_reset(file_hdr_t* target); +int is_file_hdr_reset(file_hdr_t* target); void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src); int set_time_now(file_hdr_t *fh); void set_time(file_hdr_t *fh, struct timespec *ts); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index ccc9bf6bab..3062f059cd 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -45,9 +45,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/): _fhdr_ptr_arr(0), _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), - _jfsize_dblks(0), - _jfsize_pgs(0), - _num_jfiles(0), +// _jfsize_dblks(0), +// _jfsize_pgs(0), +// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -65,9 +65,9 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, _fhdr_ptr_arr(0), _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), - _jfsize_dblks(0), - _jfsize_pgs(0), - _num_jfiles(0), +// _jfsize_dblks(0), +// _jfsize_pgs(0), +// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -94,9 +94,9 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); - _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; - _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; - assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); +// _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; +// _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; +// assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); if (eo) { @@ -555,7 +555,7 @@ wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint3 } void -wmgr::flush_check(iores& res, bool& cont, bool& done) +wmgr::flush_check(iores& res, bool& /*cont*/, bool& done) { // Is page is full, flush if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) @@ -569,6 +569,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done) done = true; } +/* // If file is full, rotate to next file if (_pg_cntr >= _jfsize_pgs) { @@ -583,6 +584,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done) done = true; } } +*/ } } @@ -590,12 +592,14 @@ iores wmgr::flush() { iores res = write_flush(); +/* if (_pg_cntr >= _jfsize_pgs) { iores rfres = rotate_file(); if (rfres != RHM_IORES_SUCCESS) res = rfres; } +*/ return res; } @@ -836,12 +840,13 @@ wmgr::is_txn_synced(const std::string& xid) } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages) +wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/) { +/* pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); - /*_num_jfiles = _jc->num_jfiles();*/ // TODO: replace for linearstore: _jc->num_jfiles() - if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize /** _num_jfiles*/)) + _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles() + if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles)) { wmgr::clean(); std::ostringstream oss; @@ -863,6 +868,7 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; +*/ } iores @@ -988,7 +994,7 @@ void wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro) { file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/; - /*int err =*/ ::file_hdr_init(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, rid, fro, 0, 0, 0); + /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str()); std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr)); #ifdef RHM_CLEAN std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr)); @@ -1030,8 +1036,8 @@ wmgr::clean() if (_fhdr_aio_cb_arr) { - for (uint32_t i=0; i<_num_jfiles; i++) - delete _fhdr_aio_cb_arr[i]; +// for (uint32_t i=0; i<_num_jfiles; i++) +// delete _fhdr_aio_cb_arr[i]; std::free(_fhdr_aio_cb_arr); _fhdr_aio_cb_arr = 0; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h index 68de2db58f..a4afd8974c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h @@ -67,9 +67,9 @@ namespace qls_jrnl aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) std::deque _ddtokl; ///< Deferred dequeue data_tok list - uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) - uint32_t _jfsize_pgs; ///< Journal file size in cache pages - uint16_t _num_jfiles; ///< Number of files used in iocb mallocs +// uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) +// uint32_t _jfsize_pgs; ///< Journal file size in cache pages +// uint16_t _num_jfiles; ///< Number of files used in iocb mallocs // TODO: Convert _enq_busy etc into a proper threadsafe lock // TODO: Convert to enum? Are these encodes mutually exclusive? diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index a9f7b143ad..dce8f7988a 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -22,15 +22,15 @@ - - + + - - - + + + @@ -48,11 +48,11 @@ - - - - - + + + + + @@ -76,13 +76,13 @@ - + - + -- cgit v1.2.1