diff options
author | Gordon Sim <gsim@apache.org> | 2013-05-24 00:50:47 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-05-24 00:50:47 +0000 |
commit | 2f6824ec2313aebf3a9b32df1bbd23c74bc3fdf5 (patch) | |
tree | f113cb1c0f801d810ae6e5cb5ac25435d218c2c6 /cpp | |
parent | 5d35a022c91a40fd5960100444c4520df5416b7e (diff) | |
download | qpid-python-2f6824ec2313aebf3a9b32df1bbd23c74bc3fdf5.tar.gz |
QPID-4859: ensure flush is called on journals
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IngressCompletion.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IngressCompletion.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 |
8 files changed, 113 insertions, 30 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 8ceeb824c4..2cc2a6fece 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1198,6 +1198,7 @@ set (qpidbroker_SOURCES qpid/broker/ExchangeRegistry.cpp qpid/broker/FanOutExchange.cpp qpid/broker/HeadersExchange.cpp + qpid/broker/IngressCompletion.cpp qpid/broker/Link.cpp qpid/broker/LinkRegistry.cpp qpid/broker/LossyQueue.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 9c2ca83214..1c129e196a 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -635,6 +635,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.cpp \ qpid/broker/HeadersExchange.h \ qpid/broker/AsyncCompletion.h \ + qpid/broker/IngressCompletion.h \ + qpid/broker/IngressCompletion.cpp \ qpid/broker/IndexedDeque.h \ qpid/broker/Link.cpp \ qpid/broker/Link.h \ diff --git a/cpp/src/qpid/broker/IngressCompletion.cpp b/cpp/src/qpid/broker/IngressCompletion.cpp new file mode 100644 index 0000000000..5affd2b940 --- /dev/null +++ b/cpp/src/qpid/broker/IngressCompletion.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "IngressCompletion.h" +#include "Queue.h" + +namespace qpid { +namespace broker { +IngressCompletion::~IngressCompletion() {} + +void IngressCompletion::enqueueAsync(boost::shared_ptr<Queue> q) +{ + qpid::sys::Mutex::ScopedLock l(lock); + queues.push_back(q); +} + +void IngressCompletion::flush() +{ + Queues copy; + { + qpid::sys::Mutex::ScopedLock l(lock); + queues.swap(copy); + } + for (Queues::const_iterator i = copy.begin(); i != copy.end(); ++i) { + (*i)->flush(); + } +} +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/IngressCompletion.h b/cpp/src/qpid/broker/IngressCompletion.h new file mode 100644 index 0000000000..6fdbb12aa1 --- /dev/null +++ b/cpp/src/qpid/broker/IngressCompletion.h @@ -0,0 +1,51 @@ +#ifndef QPID_BROKER_INGRESSCOMPLETION_H +#define QPID_BROKER_INGRESSCOMPLETION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AsyncCompletion.h" +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace qpid { +namespace broker { + +class Queue; +/** + * An AsyncCompletion object for async enqueues, that can be flushed + * when needed + */ +class IngressCompletion : public AsyncCompletion +{ + public: + virtual ~IngressCompletion(); + + void enqueueAsync(boost::shared_ptr<Queue>); + void flush(); + private: + typedef std::vector<boost::shared_ptr<Queue> > Queues; + Queues queues; + qpid::sys::Mutex lock; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_INGRESSCOMPLETION_H*/ diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index feff04712a..1da0bf7648 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -50,7 +50,7 @@ enum MessageState class Message { public: - class Encoding : public AsyncCompletion + class Encoding : public IngressCompletion { public: virtual ~Encoding() {} diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index b604f77aab..afc0976d61 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -1,3 +1,4 @@ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,7 +22,7 @@ #include "qpid/broker/PersistableMessage.h" -#include "qpid/broker/MessageStore.h" +#include "qpid/broker/Queue.h" #include <iostream> using namespace qpid::broker; @@ -32,7 +33,7 @@ namespace broker { PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {} -void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) +void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<IngressCompletion> i) { ingressCompletion = i.get(); /** @@ -57,21 +58,13 @@ void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompleti } } - -void PersistableMessage::flush() -{ - //TODO: is this really the right place for this? -} - - -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*) +void PersistableMessage::enqueueAsync(boost::shared_ptr<Queue> q) { enqueueStart(); + ingressCompletion->enqueueAsync(q); } -bool PersistableMessage::isDequeueComplete() { return false; } void PersistableMessage::dequeueComplete() {} -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} }} diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 2c33dd5ecb..85ac4a3b85 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -31,8 +31,8 @@ #include "qpid/framing/amqp_types.h" #include "qpid/framing/amqp_framing.h" #include "qpid/sys/Mutex.h" -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/AsyncCompletion.h" +#include "qpid/broker/IngressCompletion.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace types { @@ -57,36 +57,27 @@ class PersistableMessage : public Persistable * operations have completed, the transfer of this message from the client * may be considered complete. */ - AsyncCompletion* ingressCompletion; - boost::intrusive_ptr<AsyncCompletion> holder; + IngressCompletion* ingressCompletion; + boost::intrusive_ptr<IngressCompletion> holder; mutable uint64_t persistenceId; public: virtual ~PersistableMessage(); PersistableMessage(); - void flush(); - - QPID_BROKER_EXTERN void setStore(MessageStore*); - virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; /** track the progress of a message received by the broker - see ingressCompletion above */ QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); } - QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; } - QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i); + QPID_BROKER_INLINE_EXTERN IngressCompletion& getIngressCompletion() { return *ingressCompletion; } + QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<IngressCompletion> i); QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); } QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); } - QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, - MessageStore* _store); - + QPID_BROKER_EXTERN void enqueueAsync(boost::shared_ptr<Queue> queue); - QPID_BROKER_EXTERN bool isDequeueComplete(); QPID_BROKER_EXTERN void dequeueComplete(); - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, - MessageStore* _store); uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f82fc815c9..c852a61f6e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -846,7 +846,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) // when it considers the message stored. boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); assert(pmsg); - pmsg->enqueueAsync(shared_from_this(), store); + pmsg->enqueueAsync(shared_from_this()); try { store->enqueue(ctxt, pmsg, *this); } catch (...) { |