diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
| commit | ad9bebb1157f009151973cf721fdebdd663d39e3 (patch) | |
| tree | 3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002 /cpp/src/qpid | |
| parent | 220841d24ff48f27339000e887d5465a53c39013 (diff) | |
| download | qpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz | |
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncStore.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAsyncContext.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandle.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 18 |
10 files changed, 154 insertions, 77 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 5a4905fef6..083034acc4 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerAsyncContext* brokerCtxt) { AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), @@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt) @@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, resultCb, brokerCtxt); m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); +//delete op; +//delete brokerCtxt; } void diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 7e3b3e94da..0298c74dc5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -111,16 +111,10 @@ public: qpid::broker::BrokerAsyncContext* brokerCtxt); void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 1e52eb3612..69ddf7645e 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -23,8 +23,6 @@ #include "OperationQueue.h" -#include "qpid/broker/BrokerAsyncContext.h" - namespace qpid { namespace asyncStore { @@ -42,7 +40,7 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(const AsyncOperation* op) { -//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; +//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; if ((*i)->m_resCb) { ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); } else { diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h index b13caa5462..207bbc68f2 100644 --- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h +++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h @@ -40,7 +40,7 @@ public: /** * \brief Constructor with an option to set an inital value for the counter. */ - AtomicCounter(T initialValue = T(0)) : + AtomicCounter(const T initialValue = T(0)) : m_cnt(initialValue) {} @@ -58,13 +58,90 @@ public: * first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid * returning the value 0. */ - virtual T next() + T + next() { - // --- START OF CRITICAL SECTION --- ScopedLock l(m_mutex); while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow return m_cnt; - } // --- END OF CRITICAL SECTION --- + } + + void + operator++() + { + ScopedLock l(m_mutex); + ++m_cnt; + } + + void + operator--() + { + ScopedLock l(m_mutex); + --m_cnt; + } + + T + get() const + { + ScopedLock l(m_mutex); + return m_cnt; + } + + bool + operator==(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs.get(); + } + + bool + operator==(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs; + } + + bool + operator!=(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs.get(); + } + + bool + operator!=(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs; + } + + bool + operator>(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs.get(); + } + + bool + operator>(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs; + } + + bool + operator<(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs.get(); + } + + bool + operator<(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs; + } protected: T m_cnt; ///< Internal count value diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp index ff3e77dba5..649049bf41 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -22,6 +22,9 @@ namespace qpid { namespace broker { +BrokerAsyncContext::~BrokerAsyncContext() +{} + DataSource::~DataSource() {} diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 15e9120edb..eb47d62cf0 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -28,11 +28,14 @@ #include <string> namespace qpid { - namespace broker { -// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting: -class BrokerAsyncContext; +// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting +// Subclass this for specific contexts +class BrokerAsyncContext { +public: + virtual ~BrokerAsyncContext(); +}; // Subclassed by broker: class DataSource { @@ -96,9 +99,7 @@ public: virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitEnqueue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDequeue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; // Legacy - Restore FTD message, is NOT async! diff --git a/cpp/src/qpid/broker/BrokerAsyncContext.h b/cpp/src/qpid/broker/BrokerAsyncContext.h deleted file mode 100644 index 38d53a84f1..0000000000 --- a/cpp/src/qpid/broker/BrokerAsyncContext.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef qpid_broker_BrokerContext_hpp_ -#define qpid_broker_BrokerContext_hpp_ - -namespace qpid { -namespace broker { - -class BrokerAsyncContext -{ -public: - virtual ~BrokerAsyncContext() {} -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_BrokerContext_hpp_ diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 74c38d92cc..9339d81f32 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -32,7 +32,8 @@ namespace qpid { namespace broker { -class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, public IdHandle +class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, + public IdHandle { public: MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0); @@ -44,8 +45,8 @@ public: // <none> private: - typedef qpid::asyncStore::MessageHandleImpl Impl; - Impl* impl; + //typedef qpid::asyncStore::MessageHandleImpl Impl; + //Impl* impl; friend class qpid::messaging::PrivateImplRef<MessageHandle>; }; diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 7ba28eb293..957248b522 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -21,7 +21,8 @@ #include "qpid/broker/PersistableMessage.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/AsyncStore.h" #include <iostream> using namespace qpid::broker; @@ -29,13 +30,12 @@ using namespace qpid::broker; namespace qpid { namespace broker { -class MessageStore; - PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncDequeueCounter(0), - store(0) + store(0), + asyncStore(0) {} void PersistableMessage::flush() @@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ return false; } - -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } +} + +// deprecated +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + addToSyncList(queue, _store); + enqueueStart(); +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { addToSyncList(queue, _store); enqueueStart(); } @@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); asyncDequeueCounter++; @@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() { PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} +// deprecated void PersistableMessage::setStore(MessageStore* s) { store = s; } +void PersistableMessage::setStore(AsyncStore* s) +{ + asyncStore = s; +} + void PersistableMessage::requestContentRelease() { contentReleaseState.requested = true; diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index d29c2c45b4..8823cfa638 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -37,6 +37,7 @@ namespace qpid { namespace broker { class MessageStore; +class AsyncStore; /** * Base class for persistable messages. @@ -86,7 +87,8 @@ class PersistableMessage : public Persistable void setContentReleased(); - MessageStore* store; + MessageStore* store; // deprecated, use AsyncStore + AsyncStore* asyncStore; // new AsyncStore interface public: @@ -105,7 +107,8 @@ class PersistableMessage : public Persistable QPID_BROKER_EXTERN bool isContentReleased() const; - QPID_BROKER_EXTERN void setStore(MessageStore*); + QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated + QPID_BROKER_EXTERN void setStore(AsyncStore*); void requestContentRelease(); void blockContentRelease(); bool checkContentReleasable(); @@ -121,20 +124,25 @@ class PersistableMessage : public Persistable QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); } QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); } - QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, + QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated MessageStore* _store); + QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, + AsyncStore* _store); QPID_BROKER_EXTERN bool isDequeueComplete(); QPID_BROKER_EXTERN void dequeueComplete(); - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, + QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated MessageStore* _store); + QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, + AsyncStore* _store); bool isStoredOnQueue(PersistableQueue::shared_ptr queue); - void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); + void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated + void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store); }; }} |
