diff options
| author | Charles E. Rolke <chug@apache.org> | 2013-01-07 20:17:19 +0000 |
|---|---|---|
| committer | Charles E. Rolke <chug@apache.org> | 2013-01-07 20:17:19 +0000 |
| commit | 985928c7cb19799e3e1da7e0dcd18823a18b698c (patch) | |
| tree | 98159eae28999c1e0fc6ba9e02b9b0692722b5cb /qpid/cpp/src | |
| parent | e5c8d687df6815d49609fa9bd974c08ac521d2a2 (diff) | |
| download | qpid-python-985928c7cb19799e3e1da7e0dcd18823a18b698c.tar.gz | |
QPID-1726 ASF licensed Qpid store
Update legacystore from changes in source svn revisions 4514 to 4528.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1429990 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/JournalImpl.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/JournalImpl.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/StorePlugin.cpp | 4 |
5 files changed, 43 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp b/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp index 27894119c9..ba3f2aecae 100644 --- a/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp @@ -68,7 +68,6 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, _dlen(0), _dtok(), _external(false), - _mgmtObject(), deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); @@ -97,8 +96,9 @@ JournalImpl::~JournalImpl() inactivityFireEventPtr->cancel(); free_read_buffers(); - if (_mgmtObject != 0) { + if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); + _mgmtObject.reset(); } log(LOG_NOTICE, "Destroyed"); @@ -148,7 +148,7 @@ JournalImpl::initialize(const u_int16_t num_jfiles, jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp); log(LOG_DEBUG, "Initialization complete"); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_autoExpand(_lpmgr.is_ae()); @@ -182,7 +182,7 @@ JournalImpl::recover(const u_int16_t num_jfiles, oss1 << " wcache_num_pages=" << wcache_num_pages; log(LOG_DEBUG, oss1.str()); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_autoExpand(_lpmgr.is_ae()); @@ -227,7 +227,7 @@ JournalImpl::recover(const u_int16_t num_jfiles, oss2 << "; journal now read-only."; log(LOG_DEBUG, oss2.str()); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->inc_recordDepth(_emap.size()); _mgmtObject->inc_enqueues(_emap.size()); @@ -348,7 +348,7 @@ JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_d { handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->inc_enqueues(); _mgmtObject->inc_recordDepth(); @@ -361,7 +361,7 @@ JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dto { handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->inc_enqueues(); _mgmtObject->inc_recordDepth(); @@ -372,11 +372,11 @@ void JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient) { - bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false; + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); @@ -390,11 +390,11 @@ void JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp, const std::string& xid, const bool transient) { - bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false; + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); @@ -409,7 +409,7 @@ JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_comm { handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->inc_dequeues(); _mgmtObject->inc_txnDequeues(); @@ -420,11 +420,11 @@ JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_comm void JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) { - bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false; + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); @@ -439,7 +439,7 @@ JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_abort(dtokp, xid)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->dec_txn(); _mgmtObject->inc_txnAborts(); @@ -451,7 +451,7 @@ JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_commit(dtokp, xid)); - if (_mgmtObject != 0) + if (_mgmtObject.get() != 0) { _mgmtObject->dec_txn(); _mgmtObject->inc_txnCommits(); @@ -466,8 +466,9 @@ JournalImpl::stop(bool block_till_aio_cmpl) ifep->cancel(); jcntl::stop(block_till_aio_cmpl); - if (_mgmtObject != 0) { + if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); + _mgmtObject.reset(); } } diff --git a/qpid/cpp/src/qpid/legacystore/JournalImpl.h b/qpid/cpp/src/qpid/legacystore/JournalImpl.h index d3b6b65667..7227b2ffd4 100644 --- a/qpid/cpp/src/qpid/legacystore/JournalImpl.h +++ b/qpid/cpp/src/qpid/legacystore/JournalImpl.h @@ -166,7 +166,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0); // Overrides for write inactivity timer - void enqueue_data_record(const void* const data_buffGetManagementObject, const size_t tot_data_len, + void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len, mrg::journal::data_tok* dtokp, const bool transient = false); @@ -227,10 +227,10 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal // Management instrumentation callbacks overridden from jcntl inline void instr_incr_outstanding_aio_cnt() { - if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs(); + if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs(); } inline void instr_decr_outstanding_aio_cnt() { - if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs(); + if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs(); } }; // class JournalImpl diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp index 6a6f41cdf7..69e9f48a17 100644 --- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -57,7 +57,7 @@ MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid, tpc_flag(_tpc_flag) {} -MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) : +MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) : numJrnlFiles(0), autoJrnlExpand(false), autoJrnlExpandMaxFiles(0), @@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath highestRid(0), isInit(false), envPath(envpath), - timer(timer_), + broker(broker_), mgmtObject(), agent(0) {} @@ -218,7 +218,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, autoJrnlExpandMaxFiles = p; } -void MessageStoreImpl::initManagement (qpid::broker::Broker* broker) +void MessageStoreImpl::initManagement () { if (broker != 0) { agent = broker->getManagementAgent(); @@ -364,7 +364,7 @@ void MessageStoreImpl::init() // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -402,8 +402,9 @@ void MessageStoreImpl::finalize() } } - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->resourceDestroy(); + mgmtObject.reset(); } } @@ -443,7 +444,7 @@ void MessageStoreImpl::chkTplStoreInit() if (!tplStorePtr->is_ready()) { journal::jdir::create_dir(getTplBaseDir()); tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); - if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true); + if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -479,8 +480,9 @@ MessageStoreImpl::~MessageStoreImpl() QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()"); } - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->resourceDestroy(); + mgmtObject.reset(); } } @@ -513,7 +515,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, return; } - jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue), std::string("JournalData"), + jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { @@ -705,7 +707,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry) //recover transactions: for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { const PreparedTransaction pt = *i; - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_tplTransactionDepth(); mgmtObject->inc_tplTxnPrepares(); } @@ -799,7 +801,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } - jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"), + jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { @@ -979,6 +981,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // At some future point if delivery attempts are stored, then this call would // become optional depending on that information. msg->setRedelivered(); + // Reset the TTL for the recovered message + msg->computeExpiration(broker->getExpiryPolicy()); u_int32_t contentOffset = headerSize + preambleLength; u_int64_t contentSize = readSize - contentOffset; @@ -1436,7 +1440,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn, tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit); } txn.complete(commit); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->dec_tplTransactionDepth(); if (commit) mgmtObject->inc_tplTxnCommits(); @@ -1490,7 +1494,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt) ctxt->prepare(tplStorePtr.get()); // make sure all the data is written to disk before returning ctxt->sync(); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_tplTransactionDepth(); mgmtObject->inc_tplTxnPrepares(); } diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h index c21f540139..68aceedfbb 100644 --- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h @@ -150,7 +150,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem u_int64_t highestRid; bool isInit; const char* envPath; - qpid::sys::Timer& timer; + qpid::broker::Broker* broker; qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; @@ -273,7 +273,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem public: typedef boost::shared_ptr<MessageStoreImpl> shared_ptr; - MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0); + MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0); virtual ~MessageStoreImpl(); @@ -292,7 +292,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void truncateInit(const bool saveStoreContent = false); - void initManagement (qpid::broker::Broker* broker); + void initManagement (); void finalize(); @@ -364,9 +364,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } - using qpid::management::Manageable::ManagementMethod; - - inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&) + inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&) { return qpid::management::Manageable::STATUS_OK; } std::string getStoreDir() const; diff --git a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp index e8795c15e6..f9b77ce02c 100644 --- a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp @@ -44,7 +44,7 @@ struct StorePlugin : public Plugin { { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - store.reset(new MessageStoreImpl(broker->getTimer())); + store.reset(new MessageStoreImpl(broker)); DataDir& dataDir = broker->getDataDir (); if (options.storeDir.empty ()) { @@ -65,7 +65,7 @@ struct StorePlugin : public Plugin { if (!broker) return; if (!store) return; QPID_LOG(info, "Enabling management instrumentation for the store."); - store->initManagement(broker); + store->initManagement(); } void finalize() |
