diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-09-24 13:49:13 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-09-24 13:49:13 +0000 |
| commit | c095a631dcb2c7be5e167ed50f658f7c24330a45 (patch) | |
| tree | f3c6dc1e3a9f6e12501c1dcb794d18779db477ac /cpp/src/qpid/asyncStore | |
| parent | 0f327ee25b5ab4b9a38a8620a666e6bfc66000e7 (diff) | |
| download | qpid-python-c095a631dcb2c7be5e167ed50f658f7c24330a45.tar.gz | |
QPID-3858: WIP: Provisional checkin: Wiring of async store interface to broker. Code compiles, but as persistent transactions are currentl disconnected, not all tests pass.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1389378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 102 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/ConfigHandleImpl.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/Plugin.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/RecoveryHandleImpl.h | 40 |
9 files changed, 197 insertions, 47 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index 1e3ab51165..2023da2ded 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/AsyncResultHandleImpl.h" +#include "qpid/broker/RecoveryAsyncContext.h" #include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/TxnAsyncContext.h" @@ -132,6 +133,29 @@ AsyncOpTxnAbort::getOpStr() const { } +// --- class AsyncOpRecover --- + +AsyncOpRecover::AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(rcvrCtxt), store), + m_rcvrHandle(rcvrHandle) +{} + +AsyncOpRecover::~AsyncOpRecover() {} + +void +AsyncOpRecover::executeOp() const { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpRecover::getOpStr() const { + return "RECOVER"; +} + + // --- class AsyncOpConfigCreate --- AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index f22cccad07..ffd6ca44a9 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -90,6 +90,18 @@ private: }; +class AsyncOpRecover: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt, + qpid::broker::AsyncStore* store); + virtual ~AsyncOpRecover(); + virtual void executeOp() const; + virtual const char* getOpStr() const; +private: + qpid::broker::RecoveryHandle& m_rcvrHandle; +}; + class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { public: AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 9c1ff42fa2..85fd981862 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -29,6 +29,7 @@ #include "qpid/asyncStore/EventHandleImpl.h" #include "qpid/asyncStore/MessageHandleImpl.h" #include "qpid/asyncStore/QueueHandleImpl.h" +#include "qpid/asyncStore/RecoveryHandleImpl.h" #include "qpid/asyncStore/TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" #include "qpid/broker/EnqueueHandle.h" @@ -36,6 +37,8 @@ #include "qpid/broker/MessageHandle.h" #include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/QueueHandle.h" +#include "qpid/broker/RecoveryAsyncContext.h" +#include "qpid/broker/RecoveryHandle.h" #include "qpid/broker/TxnAsyncContext.h" #include "qpid/broker/TxnHandle.h" @@ -48,12 +51,17 @@ AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, m_opts(opts), m_runState(), m_operations(m_poller) -{} +{ + QPID_LOG(info, "AsyncStoreImpl::AsyncStoreImpl()"); +} AsyncStoreImpl::~AsyncStoreImpl() {} void -AsyncStoreImpl::initialize() {} +AsyncStoreImpl::initialize(bool truncateFlag, + bool saveFlag) { + QPID_LOG(info, "AsyncStoreImpl::initialize() truncateFlag=" << (truncateFlag?"T":"F") << " saveFlag=" << (saveFlag?"T":"F")); +} uint64_t AsyncStoreImpl::getNextRid() { @@ -88,25 +96,42 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); + m_operations.submit(op); +} + +qpid::broker::RecoveryHandle +AsyncStoreImpl::createRecoveryHandle() { + return qpid::broker::RecoveryHandle(new RecoveryHandleImpl()); +} + +void +AsyncStoreImpl::submitRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt) { + assert(rcvrCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpRecover(rcvrHandle, rcvrCtxt, this)); + rcvrCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -144,6 +169,7 @@ void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -152,6 +178,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -160,25 +187,28 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -187,6 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -196,6 +227,7 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -204,28 +236,30 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } -int -AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/, - qpid::broker::QueueHandle& /*queueHandle*/, - char* /*data*/, - uint64_t /*offset*/, - const uint64_t /*length*/) { - return 0; -} +//int +//AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/, +// qpid::broker::QueueHandle& /*queueHandle*/, +// char* /*data*/, +// uint64_t /*offset*/, +// const uint64_t /*length*/) { +// return 0; +//} }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 12a7f62c09..dd2cabaa69 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -47,7 +47,7 @@ public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, const AsyncStoreOptions& opts); virtual ~AsyncStoreImpl(); - void initialize(); + void initialize(bool truncateFlag = false, bool saveFlag = true); uint64_t getNextRid(); // Global counter for journal RIDs // --- Management --- @@ -65,11 +65,17 @@ public: qpid::broker::SimpleTxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt); void submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); void submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); + + + // --- Interface from AsyncRecoverable --- + qpid::broker::RecoveryHandle createRecoveryHandle(); + void submitRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt); // --- Interface from AsyncStore --- @@ -112,12 +118,12 @@ public: qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); - // Legacy - Restore FTD message, is NOT async! - virtual int loadContent(qpid::broker::MessageHandle& msgHandle, - qpid::broker::QueueHandle& queueHandle, - char* data, - uint64_t offset, - const uint64_t length); +// // Legacy - Restore FTD message, is NOT async! +// virtual int loadContent(qpid::broker::MessageHandle& msgHandle, +// qpid::broker::QueueHandle& queueHandle, +// char* data, +// uint64_t offset, +// const uint64_t length); private: boost::shared_ptr<qpid::sys::Poller> m_poller; diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h index 17069ec21c..2e49e0adb9 100644 --- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h @@ -29,8 +29,7 @@ namespace qpid { namespace asyncStore { -class ConfigHandleImpl : public virtual qpid::RefCounted -{ +class ConfigHandleImpl : public virtual qpid::RefCounted { public: ConfigHandleImpl(); virtual ~ConfigHandleImpl(); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 4e05fad10d..dff5387827 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,6 +50,8 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { +// DEBUG: kpvdr +std::cout << "#### OperationQueue::handle(): op=" << (*i)->getOpStr() << std::endl << std::flush; (*i)->executeOp(); // Do store work here } } catch (const std::exception& e) { diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index f6930272a4..7395fc9b87 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -43,7 +43,7 @@ Plugin::earlyInitialize(Target& target) { } m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options)); boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store); - broker->setAsyncStore(brokerAsyncStore); + broker->setStore(brokerAsyncStore); boost::function<void()> fn = boost::bind(&Plugin::finalize, this); target.addFinalizer(fn); QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir); diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp new file mode 100644 index 0000000000..a976f48e17 --- /dev/null +++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandleImpl.cpp + */ + +#include "RecoveryHandleImpl.h" + +namespace qpid { +namespace asyncStore { + +RecoveryHandleImpl::RecoveryHandleImpl() {} + +RecoveryHandleImpl::~RecoveryHandleImpl() {} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h new file mode 100644 index 0000000000..8abd0b6f65 --- /dev/null +++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoverHandleImpl.h + */ + +#ifndef qpid_asyncStore_RecoveryHandleImpl_h_ +#define qpid_asyncStore_RecoveryHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace asyncStore { + +class RecoveryHandleImpl: public virtual qpid::RefCounted { +public: + RecoveryHandleImpl(); + virtual ~RecoveryHandleImpl(); +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_RecoveryHandleImpl_h_ |
