summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-24 00:50:47 +0000
committerGordon Sim <gsim@apache.org>2013-05-24 00:50:47 +0000
commit2f6824ec2313aebf3a9b32df1bbd23c74bc3fdf5 (patch)
treef113cb1c0f801d810ae6e5cb5ac25435d218c2c6 /cpp
parent5d35a022c91a40fd5960100444c4520df5416b7e (diff)
downloadqpid-python-2f6824ec2313aebf3a9b32df1bbd23c74bc3fdf5.tar.gz
QPID-4859: ensure flush is called on journals
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/IngressCompletion.cpp45
-rw-r--r--cpp/src/qpid/broker/IngressCompletion.h51
-rw-r--r--cpp/src/qpid/broker/Message.h2
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp17
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h23
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
8 files changed, 113 insertions, 30 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 8ceeb824c4..2cc2a6fece 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1198,6 +1198,7 @@ set (qpidbroker_SOURCES
qpid/broker/ExchangeRegistry.cpp
qpid/broker/FanOutExchange.cpp
qpid/broker/HeadersExchange.cpp
+ qpid/broker/IngressCompletion.cpp
qpid/broker/Link.cpp
qpid/broker/LinkRegistry.cpp
qpid/broker/LossyQueue.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 9c2ca83214..1c129e196a 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -635,6 +635,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
qpid/broker/AsyncCompletion.h \
+ qpid/broker/IngressCompletion.h \
+ qpid/broker/IngressCompletion.cpp \
qpid/broker/IndexedDeque.h \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
diff --git a/cpp/src/qpid/broker/IngressCompletion.cpp b/cpp/src/qpid/broker/IngressCompletion.cpp
new file mode 100644
index 0000000000..5affd2b940
--- /dev/null
+++ b/cpp/src/qpid/broker/IngressCompletion.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IngressCompletion.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+IngressCompletion::~IngressCompletion() {}
+
+void IngressCompletion::enqueueAsync(boost::shared_ptr<Queue> q)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queues.push_back(q);
+}
+
+void IngressCompletion::flush()
+{
+ Queues copy;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queues.swap(copy);
+ }
+ for (Queues::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+ (*i)->flush();
+ }
+}
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/IngressCompletion.h b/cpp/src/qpid/broker/IngressCompletion.h
new file mode 100644
index 0000000000..6fdbb12aa1
--- /dev/null
+++ b/cpp/src/qpid/broker/IngressCompletion.h
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_INGRESSCOMPLETION_H
+#define QPID_BROKER_INGRESSCOMPLETION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+/**
+ * An AsyncCompletion object for async enqueues, that can be flushed
+ * when needed
+ */
+class IngressCompletion : public AsyncCompletion
+{
+ public:
+ virtual ~IngressCompletion();
+
+ void enqueueAsync(boost::shared_ptr<Queue>);
+ void flush();
+ private:
+ typedef std::vector<boost::shared_ptr<Queue> > Queues;
+ Queues queues;
+ qpid::sys::Mutex lock;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_INGRESSCOMPLETION_H*/
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index feff04712a..1da0bf7648 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -50,7 +50,7 @@ enum MessageState
class Message {
public:
- class Encoding : public AsyncCompletion
+ class Encoding : public IngressCompletion
{
public:
virtual ~Encoding() {}
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index b604f77aab..afc0976d61 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,7 +22,7 @@
#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
#include <iostream>
using namespace qpid::broker;
@@ -32,7 +33,7 @@ namespace broker {
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {}
-void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i)
+void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<IngressCompletion> i)
{
ingressCompletion = i.get();
/**
@@ -57,21 +58,13 @@ void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompleti
}
}
-
-void PersistableMessage::flush()
-{
- //TODO: is this really the right place for this?
-}
-
-
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+void PersistableMessage::enqueueAsync(boost::shared_ptr<Queue> q)
{
enqueueStart();
+ ingressCompletion->enqueueAsync(q);
}
-bool PersistableMessage::isDequeueComplete() { return false; }
void PersistableMessage::dequeueComplete() {}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 2c33dd5ecb..85ac4a3b85 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -31,8 +31,8 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IngressCompletion.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace types {
@@ -57,36 +57,27 @@ class PersistableMessage : public Persistable
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- AsyncCompletion* ingressCompletion;
- boost::intrusive_ptr<AsyncCompletion> holder;
+ IngressCompletion* ingressCompletion;
+ boost::intrusive_ptr<IngressCompletion> holder;
mutable uint64_t persistenceId;
public:
virtual ~PersistableMessage();
PersistableMessage();
- void flush();
-
- QPID_BROKER_EXTERN void setStore(MessageStore*);
-
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
/** track the progress of a message received by the broker - see ingressCompletion above */
QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
- QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
- QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
+ QPID_BROKER_INLINE_EXTERN IngressCompletion& getIngressCompletion() { return *ingressCompletion; }
+ QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<IngressCompletion> i);
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
- MessageStore* _store);
-
+ QPID_BROKER_EXTERN void enqueueAsync(boost::shared_ptr<Queue> queue);
- QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
- MessageStore* _store);
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index f82fc815c9..c852a61f6e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -846,7 +846,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
- pmsg->enqueueAsync(shared_from_this(), store);
+ pmsg->enqueueAsync(shared_from_this());
try {
store->enqueue(ctxt, pmsg, *this);
} catch (...) {