diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultHandle.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultHandle.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultHandleImpl.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultHandleImpl.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueue.cpp | 62 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.cpp | 62 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.h (renamed from cpp/src/qpid/broker/AsyncResultQueue.h) | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncStore.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConfigHandle.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/EnqueueHandle.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/EventHandle.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandle.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueHandle.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxnHandle.h | 2 |
15 files changed, 133 insertions, 169 deletions
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp index 26e46fee1c..cdd2231977 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp @@ -65,10 +65,16 @@ AsyncResultHandle::getErrMsg() const return impl->getErrMsg(); } -const BrokerAsyncContext* +boost::shared_ptr<BrokerAsyncContext> AsyncResultHandle::getBrokerAsyncContext() const { return impl->getBrokerAsyncContext(); } +void +AsyncResultHandle::invokeAsyncResultCallback() const +{ + impl->getBrokerAsyncContext()->invokeCallback(this); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h index 6f6290bfcb..f916bde5d3 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.h +++ b/cpp/src/qpid/broker/AsyncResultHandle.h @@ -43,11 +43,10 @@ public: int getErrNo() const; std::string getErrMsg() const; - const BrokerAsyncContext* getBrokerAsyncContext() const; + boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const; + void invokeAsyncResultCallback() const; private: - typedef qpid::broker::AsyncResultHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>; }; diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp index 36d45e7b0a..c8950d8ff1 100644 --- a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp @@ -28,17 +28,18 @@ namespace broker { AsyncResultHandleImpl::AsyncResultHandleImpl() : m_errNo(0), - m_errMsg(), - m_bc(0) + m_errMsg() {} -AsyncResultHandleImpl::AsyncResultHandleImpl(const BrokerAsyncContext* bc) : +AsyncResultHandleImpl::AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc) : m_errNo(0), m_errMsg(), m_bc(bc) {} -AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc) : +AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, + const std::string& errMsg, + boost::shared_ptr<BrokerAsyncContext> bc) : m_errNo(errNo), m_errMsg(errMsg), m_bc(bc) @@ -59,7 +60,7 @@ AsyncResultHandleImpl::getErrMsg() const return m_errMsg; } -const BrokerAsyncContext* +boost::shared_ptr<BrokerAsyncContext> AsyncResultHandleImpl::getBrokerAsyncContext() const { return m_bc; diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.h b/cpp/src/qpid/broker/AsyncResultHandleImpl.h index e1bd1fa0e9..4fe6d1248c 100644 --- a/cpp/src/qpid/broker/AsyncResultHandleImpl.h +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.h @@ -35,18 +35,20 @@ class AsyncResultHandleImpl : public virtual qpid::RefCounted { public: AsyncResultHandleImpl(); - AsyncResultHandleImpl(const BrokerAsyncContext* bc); - AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc); + AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc); + AsyncResultHandleImpl(const int errNo, + const std::string& errMsg, + boost::shared_ptr<BrokerAsyncContext> bc); virtual ~AsyncResultHandleImpl(); int getErrNo() const; std::string getErrMsg() const; - const BrokerAsyncContext* getBrokerAsyncContext() const; + boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const; private: const int m_errNo; const std::string m_errMsg; - const BrokerAsyncContext* m_bc; + boost::shared_ptr<BrokerAsyncContext> m_bc; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueue.cpp b/cpp/src/qpid/broker/AsyncResultQueue.cpp deleted file mode 100644 index 1094a582f4..0000000000 --- a/cpp/src/qpid/broker/AsyncResultQueue.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file AsyncResultQueue.cpp - */ - -#include "AsyncResultQueue.h" - -namespace qpid { -namespace broker { - -AsyncResultQueue::AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : - m_resQueue(boost::bind(&AsyncResultQueue::handle, this, _1), poller) -{ - m_resQueue.start(); -} - -AsyncResultQueue::~AsyncResultQueue() -{ - m_resQueue.stop(); -} - -void -AsyncResultQueue::submit(AsyncResultHandle* res) -{ - m_resQueue.push(res); -} - -//static -/* -void -AsyncResultQueue::submit(AsyncResultQueue* arq, AsyncResultHandle* rh) -{ - arq->submit(rh); -} -*/ - -// protected -AsyncResultQueue::ResultQueue::Batch::const_iterator -AsyncResultQueue::handle(const ResultQueue::Batch& e) -{ - return e.end(); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp new file mode 100644 index 0000000000..8c99ce8ef2 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultQueueImpl.cpp + */ + +#include "AsyncResultHandle.h" +#include "AsyncResultQueueImpl.h" + +namespace qpid { +namespace broker { + +AsyncResultQueueImpl::AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller) : + m_resQueue(boost::bind(&AsyncResultQueueImpl::handle, this, _1), poller) +{ + m_resQueue.start(); +} + +AsyncResultQueueImpl::~AsyncResultQueueImpl() +{ + m_resQueue.stop(); +} + +void +AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) +{ +//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush; + m_resQueue.push(arh); +} + +// protected +AsyncResultQueueImpl::ResultQueue::Batch::const_iterator +AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) +{ + + for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { +//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; + if ((*i)->isValid()) { + (*i)->invokeAsyncResultCallback(); + } + } + return e.end(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueue.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h index 8881f25bac..fc93c2d806 100644 --- a/cpp/src/qpid/broker/AsyncResultQueue.h +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h @@ -18,11 +18,13 @@ */ /** - * \file AsyncResultQueue.h + * \file AsyncResultQueueImpl.h */ -#ifndef qpid_broker_AsyncResultQueue_h_ -#define qpid_broker_AsyncResultQueue_h_ +#ifndef qpid_broker_AsyncResultQueueImpl_h_ +#define qpid_broker_AsyncResultQueueImpl_h_ + +#include "AsyncStore.h" #include "qpid/sys/PollableQueue.h" @@ -31,16 +33,15 @@ namespace broker { class AsyncResultHandle; -class AsyncResultQueue +class AsyncResultQueueImpl : public AsyncResultQueue { public: - AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); - virtual ~AsyncResultQueue(); - void submit(AsyncResultHandle* rh); -// static void submit(AsyncResultQueue* arq, AsyncResultHandle* rh); + AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller); + virtual ~AsyncResultQueueImpl(); + virtual void submit(boost::shared_ptr<AsyncResultHandle> arh); protected: - typedef qpid::sys::PollableQueue<const AsyncResultHandle*> ResultQueue; + typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue; ResultQueue m_resQueue; ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e); @@ -48,4 +49,4 @@ protected: }} // namespace qpid::broker -#endif // qpid_broker_AsyncResultQueue_h_ +#endif // qpid_broker_AsyncResultQueueImpl_h_ diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp index d37b034648..10cb3d27cf 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -22,35 +22,16 @@ namespace qpid { namespace broker { -BrokerAsyncContext::~BrokerAsyncContext() +AsyncResultQueue::~AsyncResultQueue() {} -DataSource::~DataSource() +BrokerAsyncContext::~BrokerAsyncContext() {} -AsyncStore::AsyncStore() +DataSource::~DataSource() {} AsyncStore::~AsyncStore() {} -/* -AsyncResult::AsyncResult() : - errNo(0), - errMsg() -{} - -AsyncResult::AsyncResult(const int errNo, - const std::string& errMsg) : - errNo(errNo), - errMsg(errMsg) -{} - -void -AsyncResult::destroy() -{ - delete this; -} -*/ - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index c57bdaa552..7e2ee81620 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -24,48 +24,31 @@ // does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration. #include "qpid/types/Variant.h" // qpid::types::Variant::Map +#include <boost/shared_ptr.hpp> #include <stdint.h> #include <string> namespace qpid { namespace broker { -// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting -// Subclass this for specific contexts -class BrokerAsyncContext { -public: - virtual ~BrokerAsyncContext(); -}; - -// Callback definition: -//struct AsyncResult { -// int errNo; // 0 implies no error -// std::string errMsg; -// AsyncResult(); -// AsyncResult(const int errNo, -// const std::string& errMsg); -// void destroy(); -//}; -//typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*); - +// This handle carries async op results class AsyncResultHandle; -class AsyncResultQueue; // Implements the result callback function -// Singleton class in broker which contains return pollable queue. Use submitAsyncResult() to add reulsts to queue. -class AsyncResultHandler { +// Broker to subclass as a pollable queue +class AsyncResultQueue { public: - virtual ~AsyncResultHandler(); - - // Factory method to create result handle - - virtual AsyncResultHandle createAsyncResultHandle(const int errNo, const std::string& errMsg, BrokerAsyncContext*) = 0; - - // Async return interface + virtual ~AsyncResultQueue(); + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0; +}; - virtual void submitAsyncResult(AsyncResultHandle&) = 0; +// Subclass this for specific contexts +class BrokerAsyncContext { +public: + virtual ~BrokerAsyncContext(); + virtual AsyncResultQueue* getAsyncResultQueue() const = 0; + virtual void invokeCallback(const AsyncResultHandle* const) const = 0; }; -typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultHandle*); -//typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultQueue*, AsyncResultHandle*); class DataSource { public: @@ -74,6 +57,9 @@ public: virtual void write(char* target) = 0; }; +// Callback invoked by AsyncResultQueue to pass back async results +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + class ConfigHandle; class EnqueueHandle; class EventHandle; @@ -85,39 +71,39 @@ class TxnHandle; // Subclassed by store: class AsyncStore { public: - AsyncStore(); virtual ~AsyncStore(); - // Factory methods for creating handles + // --- Factory methods for creating handles --- virtual ConfigHandle createConfigHandle() = 0; virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; - virtual MessageHandle createMessageHandle(const DataSource*) = 0; + virtual MessageHandle createMessageHandle(const DataSource* const) = 0; virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid - // Store async interface + // --- Store async interface --- - virtual void submitPrepare(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitAbort(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(ConfigHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(ConfigHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(QueueHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitFlush(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(EventHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitCreate(EventHandle&, const DataSource*, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Legacy - Restore FTD message, is NOT async! virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0; diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h index e2cdca6f15..67009bf57a 100644 --- a/cpp/src/qpid/broker/ConfigHandle.h +++ b/cpp/src/qpid/broker/ConfigHandle.h @@ -44,8 +44,6 @@ public: // <none> private: - typedef qpid::asyncStore::ConfigHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<ConfigHandle>; }; diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h index 3ab6885497..6053d1879c 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.h +++ b/cpp/src/qpid/broker/EnqueueHandle.h @@ -44,8 +44,6 @@ public: // <none> private: - typedef qpid::asyncStore::EnqueueHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<EnqueueHandle>; }; diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h index 355cb3a091..8ded98be4a 100644 --- a/cpp/src/qpid/broker/EventHandle.h +++ b/cpp/src/qpid/broker/EventHandle.h @@ -44,8 +44,6 @@ public: const std::string& getKey() const; private: - typedef qpid::asyncStore::EventHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<EventHandle>; }; diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 9339d81f32..739c53f7d3 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -45,8 +45,6 @@ public: // <none> private: - //typedef qpid::asyncStore::MessageHandleImpl Impl; - //Impl* impl; friend class qpid::messaging::PrivateImplRef<MessageHandle>; }; diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index a8caa03f97..cb366e2880 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -44,8 +44,6 @@ public: const std::string& getName() const; private: - typedef qpid::asyncStore::QueueHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<QueueHandle>; }; diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 814b4ea0b3..5981b89026 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -45,8 +45,6 @@ public: bool is2pc() const; private: - typedef qpid::asyncStore::TxnHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<TxnHandle>; }; |
