diff options
| author | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
| commit | ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch) | |
| tree | 9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/cluster | |
| parent | d3f652de187cac449e1fae4e00fce59c204f020a (diff) | |
| download | qpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz | |
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
rubygen - fixed error in generation of exceptions for bad codes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 6 |
5 files changed, 126 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 9526a33ac6..fad0563872 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -90,7 +90,4 @@ struct ClusterPlugin : public Plugin { static ClusterPlugin instance; // Static initialization. -// For test purposes. -Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ada26ab2fb..513816735d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,7 +24,11 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/RecoveredDequeue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -36,7 +40,7 @@ #include <boost/current_function.hpp> -// FIXME aconway 2008-11-03: +// TODO aconway 2008-11-03: // // Disproportionate amount of code here is dedicated to receiving a // brain-dump when joining a cluster and building initial @@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { std::string message; if (body.getMethod()) { switch (body.getMethod()->amqpClassId()) { - case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break; case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } @@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) { if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; } if (!message.empty()) - connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0); + connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0); return !message.empty(); } // Delivered from cluster. void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f); + QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f); assert(!catchUp); currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol. @@ -247,11 +250,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } + +shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { + shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); + if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); + return queue; +} + broker::QueuedMessage Connection::getDumpMessage() { - // Get a message from the DUMP queue. - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); - broker::QueuedMessage m = dumpQueue->get(); + broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); return m; } @@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); - if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); broker::QueuedMessage m; + broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) { // Message at front of dump queue - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - m = dumpQueue->get(); - } + if (acquired) // Message is on the dump queue + m = getDumpMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - - semanticState().record(dr); + semanticState().record(dr); // Part of the session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } +void Connection::txStart() { + txBuffer = make_shared_ptr(new broker::TxBuffer()); +} +void Connection::txAccept(const framing::SequenceSet& acked) { + txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked()))); +} + +void Connection::txDequeue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txEnqueue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txPublish(const framing::Array& queues, bool delivered) { + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload)); + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) + txPub->deliverTo(findQueue((*i)->get<std::string>())); + txPub->delivered = delivered; + txBuffer->enlist(txPub); +} + +void Connection::txEnd() { + semanticState().setTxBuffer(txBuffer); +} + +void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { + semanticState().setAccumulatedAck(s); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 331ac33ab0..2eafa90f32 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -125,12 +125,21 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); + void txStart(); + void txAccept(const framing::SequenceSet&); + void txDequeue(const std::string&); + void txEnqueue(const std::string&); + void txPublish(const qpid::framing::Array&, bool); + void txEnd(); + void accumulatedAck(const qpid::framing::SequenceSet&); + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); + boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); broker::SemanticState& semanticState(); broker::QueuedMessage getDumpMessage(); @@ -148,6 +157,7 @@ class Connection : framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; + boost::shared_ptr<broker::TxBuffer> txBuffer; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index a2860f6f32..bb3cfdfa56 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -32,6 +32,12 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/TxOpVisitor.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/RecoveredEnqueue.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -43,7 +49,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> - +#include <algorithm> namespace qpid { namespace cluster { @@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn shadowConnection = catchUpConnection(); broker::Connection& bc = dumpConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to reconnect? + // FIXME aconway 2008-10-20: What authentication info to use on reconnect? shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( @@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); - ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); + std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1)); + + dumpTxState(ss->getSemanticState()); // Tx transaction state. // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { } void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - dumpDeliveryRecordMessage(dr); - dumpDeliveryRecord(dr); -} - -void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) { - // Dump the message associated with a dr if need be. if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // dumpees queue, put it on the dump queue for dumpee to pick up. // MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); } -} - -void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { - // Assumes the associated message has already been dumped (if needed) ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, @@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { dr.isWindowing()); } +class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { + public: + TxOpDumper(DumpClient& dc, client::AsyncSession s) + : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {} + + void operator()(const broker::DtxAck& ) { + throw InternalErrorException("DTX transactions not currently supported by cluster."); + } + + void operator()(const broker::RecoveredDequeue& rdeq) { + dumpMessage(rdeq.getMessage()); + proxy.txEnqueue(rdeq.getQueue()->getName()); + } + + void operator()(const broker::RecoveredEnqueue& renq) { + dumpMessage(renq.getMessage()); + proxy.txEnqueue(renq.getQueue()->getName()); + } + + void operator()(const broker::TxAccept& txAccept) { + proxy.txAccept(txAccept.getAcked()); + } + + void operator()(const broker::TxPublish& txPub) { + dumpMessage(txPub.getMessage()); + typedef std::list<Queue::shared_ptr> QueueList; + const QueueList& qlist = txPub.getQueues(); + Array qarray(TYPE_CODE_STR8); + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + proxy.txPublish(qarray, txPub.delivered); + } + + private: + DumpClient& parent; + client::AsyncSession session; + ClusterConnectionProxy proxy; +}; + +void DumpClient::dumpTxState(broker::SemanticState& s) { + QPID_LOG(debug, dumperId << " dumping TX transaction state."); + ClusterConnectionProxy proxy(shadowSession); + proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); + if (txBuffer) { + proxy.txStart(); + TxOpDumper dumper(*this, shadowSession); + txBuffer->accept(dumper); + proxy.txEnd(); + } +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index 716e7dcc3a..23676e7646 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -71,6 +71,8 @@ class DumpClient : public sys::Runnable { void dump(); void run(); // Will delete this when finished. + void dumpUnacked(const broker::DeliveryRecord&); + private: void dumpQueue(const boost::shared_ptr<broker::Queue>&); void dumpExchange(const boost::shared_ptr<broker::Exchange>&); @@ -79,10 +81,8 @@ class DumpClient : public sys::Runnable { void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); void dumpConnection(const boost::intrusive_ptr<Connection>& connection); void dumpSession(broker::SessionHandler& s); + void dumpTxState(broker::SemanticState& s); void dumpConsumer(const broker::SemanticState::ConsumerImpl*); - void dumpUnacked(const broker::DeliveryRecord&); - void dumpDeliveryRecord(const broker::DeliveryRecord&); - void dumpDeliveryRecordMessage(const broker::DeliveryRecord&); MemberId dumperId; MemberId dumpeeId; |
