summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
committerAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
commitad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch)
tree9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/broker
parentd3f652de187cac449e1fae4e00fce59c204f020a (diff)
downloadqpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array. framing/Array: - added some std:::vector like functions & typedefs. - use TypeCode enums, human readable ostream << operator. rubygen - fixed error in generation of exceptions for bad codes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/DtxAck.h1
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h4
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h9
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp2
-rw-r--r--cpp/src/qpid/broker/TxAccept.h10
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp5
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h3
-rw-r--r--cpp/src/qpid/broker/TxOp.h10
-rw-r--r--cpp/src/qpid/broker/TxOpVisitor.h100
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
12 files changed, 145 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index 05c4499839..d43532906a 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -39,6 +39,7 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DtxAck(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
};
}
}
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 276e1f4c5c..ef6b8f1f74 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -45,6 +45,10 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
};
}
}
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 6525179769..2d97768e65 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -45,6 +45,11 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredEnqueue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+
};
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 22f6316974..73dfc8cde8 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -436,7 +436,7 @@ void SemanticState::recover(bool requeue)
if(requeue){
//take copy and clear unacked as requeue may result in redelivery to this session
//which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
+ DeliveryRecords copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index dbb3e1d3b6..340017ddf0 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -134,7 +134,7 @@ class SemanticState : public sys::OutputTask,
DeliveryAdapter& deliveryAdapter;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
+ DeliveryRecords unacked;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
@@ -216,8 +216,11 @@ class SemanticState : public sys::OutputTask,
static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
- template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); }
-
+ DeliveryRecords& getUnacked() { return unacked; }
+ framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
+ TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+ void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+ void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
};
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 6d307bf735..594a466453 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -69,7 +69,7 @@ void TxAccept::RangeOps::commit()
}
}
-TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
acked(_acked), unacked(_unacked), ops(unacked)
{
//populate the ops
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
index 5474327f7c..0a5fdedb0a 100644
--- a/cpp/src/qpid/broker/TxAccept.h
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -56,8 +56,8 @@ namespace qpid {
void commit();
};
- framing::SequenceSet& acked;
- std::list<DeliveryRecord>& unacked;
+ framing::SequenceSet acked;
+ DeliveryRecords& unacked;
RangeOps ops;
public:
@@ -66,11 +66,15 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
+ TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAccept(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ // Used by cluster replication.
+ const framing::SequenceSet& getAcked() const { return acked; }
};
}
}
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index 8fe2c17bf0..ae18e0f318 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -22,6 +22,7 @@
#include "qpid/log/Statement.h"
#include <boost/mem_fn.hpp>
+#include <boost/bind.hpp>
using boost::mem_fn;
using namespace qpid::broker;
@@ -73,3 +74,7 @@ bool TxBuffer::commitLocal(TransactionalStore* const store)
}
return false;
}
+
+void TxBuffer::accept(TxOpConstVisitor& v) const {
+ std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
+}
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
index 361c47e92c..aabb5ea0b1 100644
--- a/cpp/src/qpid/broker/TxBuffer.h
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -107,6 +107,9 @@ namespace qpid {
* commit
*/
bool commitLocal(TransactionalStore* const store);
+
+ // Used by cluster to replicate transaction status.
+ void accept(TxOpConstVisitor& v) const;
};
}
}
diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h
index e687c437cc..5265478e36 100644
--- a/cpp/src/qpid/broker/TxOp.h
+++ b/cpp/src/qpid/broker/TxOp.h
@@ -21,11 +21,15 @@
#ifndef _TxOp_
#define _TxOp_
+#include "TxOpVisitor.h"
#include "TransactionalStore.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+
+class TxOpConstVisitor;
+
class TxOp{
public:
typedef boost::shared_ptr<TxOp> shared_ptr;
@@ -34,9 +38,11 @@ namespace qpid {
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
+
+ virtual void accept(TxOpConstVisitor&) const = 0;
};
- }
-}
+
+}} // namespace qpid::broker
#endif
diff --git a/cpp/src/qpid/broker/TxOpVisitor.h b/cpp/src/qpid/broker/TxOpVisitor.h
new file mode 100644
index 0000000000..a5f2a018c9
--- /dev/null
+++ b/cpp/src/qpid/broker/TxOpVisitor.h
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 018437f1ed..1f73cb8767 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -75,8 +75,12 @@ namespace qpid {
virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
virtual ~TxPublish(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
uint64_t contentSize();
+
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ const std::list<Queue::shared_ptr> getQueues() const { return queues; }
};
}
}