diff options
| author | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
| commit | 2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch) | |
| tree | 42fb45022ea08fee157abf50713b452acf5eda5d /cpp/src/qpid/cluster | |
| parent | 7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff) | |
| download | qpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz | |
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining.
- Use cluster timer for DTX timeouts.
- Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 140 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 81 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateReceiver.h | 7 |
6 files changed, 198 insertions, 66 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 82ed8bf8c9..1c398d63f4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -57,12 +57,12 @@ * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. * - * - Dtx: not yet supported with cluster. - * - * cluster::ExpiryPolicy implements the strategy for message expiry. + * cluster::ExpiryPolicy uses cluster time. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. + * Used for: + * - periodic management events. + * - DTX transaction timeouts. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * @@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1128070; +const uint32_t Cluster::CLUSTER_VERSION = 1159329; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 030d6e34c1..0691aae711 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,6 +24,8 @@ #include "Cluster.h" #include "UpdateReceiver.h" #include "qpid/assert.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -114,7 +116,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - } + } init(); QPID_LOG(debug, cluster << " local connection " << *this); } @@ -167,7 +169,7 @@ void Connection::announce( AMQFrame frame; while (frame.decode(buf)) connection->received(frame); - connection->setUserId(username); + connection->setUserId(username); } // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); @@ -214,16 +216,9 @@ void Connection::received(framing::AMQFrame& f) { } } -bool Connection::checkUnsupported(const AMQBody& body) { - std::string message; - if (body.getMethod()) { - switch (body.getMethod()->amqpClassId()) { - case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; - } - } - if (!message.empty()) - connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); - return !message.empty(); +bool Connection::checkUnsupported(const AMQBody&) { + // Throw an exception for unsupported commands. Currently all are supported. + return false; } struct GiveReadCreditOnExit { @@ -464,11 +459,21 @@ void Connection::shadowReady( output.setSendMax(sendMax); } +void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID + uint32_t index = v.first.second; // Index + v.second->setDtxBuffer((*record)[index]); +} + +// Marks the end of the update. void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); updateIn.consumerNumbering.clear(); + for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(), + boost::bind(&Connection::setDtxBuffer, this, _1)); closeUpdated(); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); } @@ -536,8 +541,16 @@ void Connection::deliveryRecord(const string& qname, } else { // Message at original position in original queue m = queue->find(position); } - if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no update message")); + // FIXME aconway 2011-08-19: removed: + // if (!m.payload) + // throw Exception(QPID_MSG("deliveryRecord no update message")); + // + // It seems this could happen legitimately in the case one + // session browses message M, then another session acquires + // it. In that case the browsers delivery record is !acquired + // but the message is not on its original Queue. In that case + // we'll get a deliveryRecord with no payload for the browser. + // } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -545,7 +558,11 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - semanticState().record(dr); // Part of the session's unacked list. + + if (dtxBuffer) // Record for next dtx-ack + dtxAckRecords.push_back(dr); + else + semanticState().record(dr); // Record on session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -561,29 +578,29 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri namespace { - // find a StatefulQueueObserver that matches a given identifier - class ObserverFinder { - const std::string id; - boost::shared_ptr<broker::QueueObserver> target; - ObserverFinder(const ObserverFinder&) {} - public: - ObserverFinder(const std::string& _id) : id(_id) {} - broker::StatefulQueueObserver *getObserver() - { - if (target) - return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); - return 0; - } - void operator() (boost::shared_ptr<broker::QueueObserver> o) - { - if (!target) { - broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (p && p->getId() == id) { - target = o; - } +// find a StatefulQueueObserver that matches a given identifier +class ObserverFinder { + const std::string id; + boost::shared_ptr<broker::QueueObserver> target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); + return 0; + } + void operator() (boost::shared_ptr<broker::QueueObserver> o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (p && p->getId() == id) { + target = o; } } - }; + } +}; } @@ -615,6 +632,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { void Connection::txStart() { txBuffer.reset(new broker::TxBuffer()); } + void Connection::txAccept(const framing::SequenceSet& acked) { txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( new broker::TxAccept(acked, semanticState().getUnacked()))); @@ -630,8 +648,10 @@ void Connection::txEnqueue(const std::string& queue) { new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } -void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); +void Connection::txPublish(const framing::Array& queues, bool delivered) +{ + boost::shared_ptr<broker::TxPublish> txPub( + new broker::TxPublish(getUpdateMessage().payload)); for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; @@ -646,6 +666,50 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } +void Connection::dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired) +{ + dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired)); + txBuffer = dtxBuffer; +} + +void Connection::dtxEnd() { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + std::string xid = dtxBuffer->getXid(); + if (mgr.exists(xid)) + mgr.join(xid, dtxBuffer); + else + mgr.start(xid, dtxBuffer); + dtxBuffer.reset(); + txBuffer.reset(); +} + +// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords +void Connection::dtxAck() { + dtxBuffer->enlist( + boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords))); + dtxAckRecords.clear(); +} + +void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { + // Save the association between DtxBuffer and session so we can + // set the DtxBuffer on the session at the end of the update + // when the DtxManager has been replicated. + updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +} + +// Sent at end of work record. +void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout) +{ + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + if (timeout) mgr.setTimeout(xid, timeout); + if (prepared) mgr.prepare(xid); +} + + void Connection::exchange(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index a9740f97f8..5133e4641e 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -29,6 +29,7 @@ #include "qpid/RefCounted.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" @@ -164,6 +165,17 @@ class Connection : void txEnd(); void accumulatedAck(const framing::SequenceSet&); + // Dtx state + void dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired); + void dtxEnd(); + void dtxAck(); + void dtxBufferRef(const std::string& xid, uint32_t index); + void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); + // Encoded exchange replication. void exchange(const std::string& encoded); @@ -251,7 +263,7 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); void closeUpdated(); - + void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &); Cluster& cluster; ConnectionId self; bool catchUp; @@ -263,6 +275,9 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; + boost::shared_ptr<broker::DtxBuffer> dtxBuffer; + broker::DeliveryRecords dtxAckRecords; + broker::DtxWorkRecord* dtxCurrent; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index fc104e8ca9..a5662bb2b3 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -45,6 +45,8 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" +#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" @@ -65,6 +67,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <iterator> #include <sstream> namespace qpid { @@ -177,9 +180,9 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); // some Queue Observers need session state & msgs synced first, so sync observers now b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); @@ -189,6 +192,8 @@ void UpdateClient::update() { updateLinks(); updateManagementAgent(); + updateDtxManager(); + session.queueDelete(arg::queue=UPDATE); session.close(); @@ -356,7 +361,8 @@ class MessageUpdater { for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame( + *(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -479,9 +485,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); + boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); - updateTxState(ss->getSemanticState()); // Tx transaction state. + updateTransactionState(ss->getSemanticState()); // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -542,14 +548,18 @@ void UpdateClient::updateConsumer( << " on " << shadowSession.getId()); } -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, + client::AsyncSession& updateSession) +{ + if (!dr.isEnded() && dr.isAcquired()) { + // FIXME aconway 2011-08-19: should this be assert or if? + assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(shadowSession).deliveryRecord( + ClusterConnectionProxy(updateSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -570,8 +580,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); + void operator()(const broker::DtxAck& ack) { + std::for_each(ack.getPending().begin(), ack.getPending().end(), + boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); + proxy.dtxAck(); } void operator()(const broker::RecoveredDequeue& rdeq) { @@ -588,13 +600,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } + typedef std::list<Queue::shared_ptr> QueueList; + + void copy(const QueueList& l, Array& a) { + for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) + a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + } + void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); + assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); proxy.txPublish(qarray, txPub.delivered); } @@ -604,19 +621,33 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); +void UpdateClient::updateTransactionState(broker::SemanticState& s) { + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { + if (dtx) { + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + } else if (tx) { + ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); + tx->accept(updater); proxy.txEnd(); } } +void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { + ClusterConnectionProxy proxy(session); + proxy.dtxStart( + dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); + TxOpUpdater updater(*this, session, expiry); + dtx->accept(updater); + proxy.dtxEnd(); +} + void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { queue->getListeners().eachListener( boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); @@ -667,5 +698,17 @@ void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, } } +void UpdateClient::updateDtxManager() { + broker::DtxManager& dtm = updaterBroker.getDtxManager(); + dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); +} + +void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { + QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); + for (size_t i = 0; i < r.size(); ++i) + updateDtxBuffer(r[i]); + ClusterConnectionProxy(session).dtxWorkRecord( + r.getXid(), r.isPrepared(), r.getTimeout()); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 21bf6024e0..83d4cfac81 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -52,7 +52,7 @@ class Decoder; class Link; class Bridge; class QueueObserver; - +class DtxBuffer; } // namespace broker namespace cluster { @@ -88,7 +88,7 @@ class UpdateClient : public sys::Runnable { void update(); void run(); // Will delete this when finished. - void updateUnacked(const broker::DeliveryRecord&); + void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&); private: void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&); @@ -100,7 +100,7 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); - void updateTxState(broker::SemanticState& s); + void updateTransactionState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); @@ -112,6 +112,9 @@ class UpdateClient : public sys::Runnable { void updateBridge(const boost::shared_ptr<broker::Bridge>&); void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); + void updateDtxManager(); + void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& ); + void updateDtxWorkRecord(const broker::DtxWorkRecord&); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h index 7e8ce47662..512e59e5a1 100644 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -39,6 +39,13 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; + + /** Relationship between DtxBuffers, identified by xid, index in DtxManager, + * and sessions represented by their SemanticState. + */ + typedef std::pair<std::string, uint32_t> DtxBufferRef; + typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers; + DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster |
