diff options
| author | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
| commit | ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch) | |
| tree | 9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/broker | |
| parent | d3f652de187cac449e1fae4e00fce59c204f020a (diff) | |
| download | qpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz | |
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
rubygen - fixed error in generation of exceptions for bad codes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/DtxAck.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/RecoveredDequeue.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/RecoveredEnqueue.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxBuffer.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxOp.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxOpVisitor.h | 100 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 4 |
12 files changed, 145 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 05c4499839..d43532906a 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -39,6 +39,7 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~DtxAck(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } }; } } diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 276e1f4c5c..ef6b8f1f74 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -45,6 +45,10 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~RecoveredDequeue(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + Queue::shared_ptr getQueue() const { return queue; } + boost::intrusive_ptr<Message> getMessage() const { return msg; } }; } } diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index 6525179769..2d97768e65 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -45,6 +45,11 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~RecoveredEnqueue(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + Queue::shared_ptr getQueue() const { return queue; } + boost::intrusive_ptr<Message> getMessage() const { return msg; } + }; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 22f6316974..73dfc8cde8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -436,7 +436,7 @@ void SemanticState::recover(bool requeue) if(requeue){ //take copy and clear unacked as requeue may result in redelivery to this session //which will in turn result in additions to unacked - std::list<DeliveryRecord> copy = unacked; + DeliveryRecords copy = unacked; unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index dbb3e1d3b6..340017ddf0 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -134,7 +134,7 @@ class SemanticState : public sys::OutputTask, DeliveryAdapter& deliveryAdapter; ConsumerImplMap consumers; NameGenerator tagGenerator; - std::list<DeliveryRecord> unacked; + DeliveryRecords unacked; TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; @@ -216,8 +216,11 @@ class SemanticState : public sys::OutputTask, static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } - template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); } - + DeliveryRecords& getUnacked() { return unacked; } + framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } + TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } + void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; } + void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); }; diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 6d307bf735..594a466453 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -69,7 +69,7 @@ void TxAccept::RangeOps::commit() } } -TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : +TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : acked(_acked), unacked(_unacked), ops(unacked) { //populate the ops diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index 5474327f7c..0a5fdedb0a 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -56,8 +56,8 @@ namespace qpid { void commit(); }; - framing::SequenceSet& acked; - std::list<DeliveryRecord>& unacked; + framing::SequenceSet acked; + DeliveryRecords& unacked; RangeOps ops; public: @@ -66,11 +66,15 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); + TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); virtual ~TxAccept(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + // Used by cluster replication. + const framing::SequenceSet& getAcked() const { return acked; } }; } } diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index 8fe2c17bf0..ae18e0f318 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -22,6 +22,7 @@ #include "qpid/log/Statement.h" #include <boost/mem_fn.hpp> +#include <boost/bind.hpp> using boost::mem_fn; using namespace qpid::broker; @@ -73,3 +74,7 @@ bool TxBuffer::commitLocal(TransactionalStore* const store) } return false; } + +void TxBuffer::accept(TxOpConstVisitor& v) const { + std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); +} diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 361c47e92c..aabb5ea0b1 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -107,6 +107,9 @@ namespace qpid { * commit */ bool commitLocal(TransactionalStore* const store); + + // Used by cluster to replicate transaction status. + void accept(TxOpConstVisitor& v) const; }; } } diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h index e687c437cc..5265478e36 100644 --- a/cpp/src/qpid/broker/TxOp.h +++ b/cpp/src/qpid/broker/TxOp.h @@ -21,11 +21,15 @@ #ifndef _TxOp_ #define _TxOp_ +#include "TxOpVisitor.h" #include "TransactionalStore.h" #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { + +class TxOpConstVisitor; + class TxOp{ public: typedef boost::shared_ptr<TxOp> shared_ptr; @@ -34,9 +38,11 @@ namespace qpid { virtual void commit() throw() = 0; virtual void rollback() throw() = 0; virtual ~TxOp(){} + + virtual void accept(TxOpConstVisitor&) const = 0; }; - } -} + +}} // namespace qpid::broker #endif diff --git a/cpp/src/qpid/broker/TxOpVisitor.h b/cpp/src/qpid/broker/TxOpVisitor.h new file mode 100644 index 0000000000..a5f2a018c9 --- /dev/null +++ b/cpp/src/qpid/broker/TxOpVisitor.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_TXOPVISITOR_H +#define QPID_BROKER_TXOPVISITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/shared_ptr.h" + +namespace qpid { +namespace broker { + +class DtxAck; +class RecoveredDequeue; +class RecoveredEnqueue; +class TxAccept; +class TxPublish; + +/** + * Visitor for TxOp familly of classes. + */ +struct TxOpConstVisitor +{ + virtual ~TxOpConstVisitor() {} + virtual void operator()(const DtxAck&) = 0; + virtual void operator()(const RecoveredDequeue&) = 0; + virtual void operator()(const RecoveredEnqueue&) = 0; + virtual void operator()(const TxAccept&) = 0; + virtual void operator()(const TxPublish&) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_TXOPVISITOR_H*/ +#ifndef QPID_BROKER_TXOPVISITOR_H +#define QPID_BROKER_TXOPVISITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/shared_ptr.h" + +namespace qpid { +namespace broker { + +class DtxAck; +class RecoveredDequeue; +class RecoveredEnqueue; +class TxAccept; +class TxPublish; + +/** + * Visitor for TxOp familly of classes. + */ +struct TxOpConstVisitor +{ + virtual ~TxOpConstVisitor() {} + virtual void operator()(const DtxAck&) = 0; + virtual void operator()(const RecoveredDequeue&) = 0; + virtual void operator()(const RecoveredEnqueue&) = 0; + virtual void operator()(const TxAccept&) = 0; + virtual void operator()(const TxPublish&) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_TXOPVISITOR_H*/ diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 018437f1ed..1f73cb8767 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -75,8 +75,12 @@ namespace qpid { virtual void deliverTo(const boost::shared_ptr<Queue>& queue); virtual ~TxPublish(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } uint64_t contentSize(); + + boost::intrusive_ptr<Message> getMessage() const { return msg; } + const std::list<Queue::shared_ptr> getQueues() const { return queues; } }; } } |
