summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
commitad9bebb1157f009151973cf721fdebdd663d39e3 (patch)
tree3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002 /cpp/src/qpid
parent220841d24ff48f27339000e887d5465a53c39013 (diff)
downloadqpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h6
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp6
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h85
-rw-r--r--cpp/src/qpid/broker/AsyncStore.cpp3
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h11
-rw-r--r--cpp/src/qpid/broker/BrokerAsyncContext.h15
-rw-r--r--cpp/src/qpid/broker/MessageHandle.h7
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp48
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h18
10 files changed, 154 insertions, 77 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 5a4905fef6..083034acc4 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerAsyncContext* brokerCtxt)
{
AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
@@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt)
@@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
resultCb,
brokerCtxt);
m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
+//delete op;
+//delete brokerCtxt;
}
void
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 7e3b3e94da..0298c74dc5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -111,16 +111,10 @@ public:
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 1e52eb3612..69ddf7645e 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -23,8 +23,6 @@
#include "OperationQueue.h"
-#include "qpid/broker/BrokerAsyncContext.h"
-
namespace qpid {
namespace asyncStore {
@@ -42,7 +40,7 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(const AsyncOperation* op)
{
-//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
if ((*i)->m_resCb) {
((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
} else {
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
index b13caa5462..207bbc68f2 100644
--- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
+++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
@@ -40,7 +40,7 @@ public:
/**
* \brief Constructor with an option to set an inital value for the counter.
*/
- AtomicCounter(T initialValue = T(0)) :
+ AtomicCounter(const T initialValue = T(0)) :
m_cnt(initialValue)
{}
@@ -58,13 +58,90 @@ public:
* first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid
* returning the value 0.
*/
- virtual T next()
+ T
+ next()
{
- // --- START OF CRITICAL SECTION ---
ScopedLock l(m_mutex);
while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow
return m_cnt;
- } // --- END OF CRITICAL SECTION ---
+ }
+
+ void
+ operator++()
+ {
+ ScopedLock l(m_mutex);
+ ++m_cnt;
+ }
+
+ void
+ operator--()
+ {
+ ScopedLock l(m_mutex);
+ --m_cnt;
+ }
+
+ T
+ get() const
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt;
+ }
+
+ bool
+ operator==(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs.get();
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs;
+ }
+
+ bool
+ operator!=(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs.get();
+ }
+
+ bool
+ operator!=(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs;
+ }
+
+ bool
+ operator>(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs.get();
+ }
+
+ bool
+ operator>(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs;
+ }
+
+ bool
+ operator<(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs.get();
+ }
+
+ bool
+ operator<(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs;
+ }
protected:
T m_cnt; ///< Internal count value
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp
index ff3e77dba5..649049bf41 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/AsyncStore.cpp
@@ -22,6 +22,9 @@
namespace qpid {
namespace broker {
+BrokerAsyncContext::~BrokerAsyncContext()
+{}
+
DataSource::~DataSource()
{}
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 15e9120edb..eb47d62cf0 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -28,11 +28,14 @@
#include <string>
namespace qpid {
-
namespace broker {
-// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting:
-class BrokerAsyncContext;
+// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting
+// Subclass this for specific contexts
+class BrokerAsyncContext {
+public:
+ virtual ~BrokerAsyncContext();
+};
// Subclassed by broker:
class DataSource {
@@ -96,9 +99,7 @@ public:
virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitEnqueue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDequeue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
// Legacy - Restore FTD message, is NOT async!
diff --git a/cpp/src/qpid/broker/BrokerAsyncContext.h b/cpp/src/qpid/broker/BrokerAsyncContext.h
deleted file mode 100644
index 38d53a84f1..0000000000
--- a/cpp/src/qpid/broker/BrokerAsyncContext.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#ifndef qpid_broker_BrokerContext_hpp_
-#define qpid_broker_BrokerContext_hpp_
-
-namespace qpid {
-namespace broker {
-
-class BrokerAsyncContext
-{
-public:
- virtual ~BrokerAsyncContext() {}
-};
-
-}} // namespace qpid::broker
-
-#endif // qpid_broker_BrokerContext_hpp_
diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h
index 74c38d92cc..9339d81f32 100644
--- a/cpp/src/qpid/broker/MessageHandle.h
+++ b/cpp/src/qpid/broker/MessageHandle.h
@@ -32,7 +32,8 @@
namespace qpid {
namespace broker {
-class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, public IdHandle
+class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>,
+ public IdHandle
{
public:
MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0);
@@ -44,8 +45,8 @@ public:
// <none>
private:
- typedef qpid::asyncStore::MessageHandleImpl Impl;
- Impl* impl;
+ //typedef qpid::asyncStore::MessageHandleImpl Impl;
+ //Impl* impl;
friend class qpid::messaging::PrivateImplRef<MessageHandle>;
};
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 7ba28eb293..957248b522 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -21,7 +21,8 @@
#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/AsyncStore.h"
#include <iostream>
using namespace qpid::broker;
@@ -29,13 +30,12 @@ using namespace qpid::broker;
namespace qpid {
namespace broker {
-class MessageStore;
-
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
asyncDequeueCounter(0),
- store(0)
+ store(0),
+ asyncStore(0)
{}
void PersistableMessage::flush()
@@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
return false;
}
-
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
}
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+}
+
+// deprecated
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+ addToSyncList(queue, _store);
+ enqueueStart();
+}
+
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
addToSyncList(queue, _store);
enqueueStart();
}
@@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() {
if (notify) allDequeuesComplete();
}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
dequeueAsync();
}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+ dequeueAsync();
+}
+
void PersistableMessage::dequeueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
asyncDequeueCounter++;
@@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() {
PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+// deprecated
void PersistableMessage::setStore(MessageStore* s)
{
store = s;
}
+void PersistableMessage::setStore(AsyncStore* s)
+{
+ asyncStore = s;
+}
+
void PersistableMessage::requestContentRelease()
{
contentReleaseState.requested = true;
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index d29c2c45b4..8823cfa638 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -37,6 +37,7 @@ namespace qpid {
namespace broker {
class MessageStore;
+class AsyncStore;
/**
* Base class for persistable messages.
@@ -86,7 +87,8 @@ class PersistableMessage : public Persistable
void setContentReleased();
- MessageStore* store;
+ MessageStore* store; // deprecated, use AsyncStore
+ AsyncStore* asyncStore; // new AsyncStore interface
public:
@@ -105,7 +107,8 @@ class PersistableMessage : public Persistable
QPID_BROKER_EXTERN bool isContentReleased() const;
- QPID_BROKER_EXTERN void setStore(MessageStore*);
+ QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated
+ QPID_BROKER_EXTERN void setStore(AsyncStore*);
void requestContentRelease();
void blockContentRelease();
bool checkContentReleasable();
@@ -121,20 +124,25 @@ class PersistableMessage : public Persistable
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
+ QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
+ QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
+ AsyncStore* _store);
QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
+ QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
+ QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
+ AsyncStore* _store);
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
- void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
+ void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated
+ void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store);
};
}}