diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 223 |
1 files changed, 166 insertions, 57 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 8f751add9b..2446c12f2b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" @@ -45,10 +45,13 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" +#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -64,6 +67,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <iterator> #include <sstream> namespace qpid { @@ -82,11 +86,20 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); +// Name for header used to carry expiration information. +const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration"; +// Headers used to flag headers/properties added by the UpdateClient so they can be +// removed on the other side. +const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props"; +const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers"; + std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { return o << "cluster(" << c.updaterId << " UPDATER)"; } -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -120,7 +133,7 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, @@ -134,13 +147,11 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.cluster-update"); - void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); session = connection.newSession(UPDATE); + session.sync(); update(); done(); } catch (const std::exception& e) { @@ -154,6 +165,13 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; + if(b.getExpiryPolicy()) { + QPID_LOG(debug, *this << "Updating updatee with cluster time"); + qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime(); + int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime); + ClusterConnectionProxy(session).clock(time); + } + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -163,16 +181,20 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); + + // some Queue Observers need session state & msgs synced first, so sync observers now + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); - ClusterConnectionProxy(session).expiryId(expiry.getId()); updateLinks(); updateManagementAgent(); + updateDtxManager(); + session.queueDelete(arg::queue=UPDATE); session.close(); @@ -184,7 +206,7 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the @@ -276,7 +298,7 @@ class MessageUpdater { framing::SequenceNumber lastPos; client::AsyncSession session; ExpiryPolicy& expiry; - + public: MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { @@ -293,7 +315,6 @@ class MessageUpdater { } } - void updateQueuedMessage(const broker::QueuedMessage& message) { // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { @@ -302,10 +323,23 @@ class MessageUpdater { } lastPos = message.position; - // Send the expiry ID if necessary. - if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { - boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); - ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); + // if the ttl > 0, we need to send the calculated expiration time to the updatee + const DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops && dprops->getTtl() > 0) { + bool hadMessageProps = + message.payload->hasProperties<framing::MessageProperties>(); + const framing::MessageProperties* mprops = + message.payload->getProperties<framing::MessageProperties>(); + bool hadApplicationHeaders = mprops->hasApplicationHeaders(); + message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); + // If message properties or application headers didn't exist + // prior to us adding data, we want to remove them on the other side. + if (!hadMessageProps) + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + else if (!hadApplicationHeaders) + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, @@ -318,7 +352,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( *message.payload->getFrames().as<framing::MessageTransferBody>()); transfer.setDestination(UpdateClient::UPDATE); - + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ @@ -326,9 +360,10 @@ class MessageUpdater { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame( + *(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -357,6 +392,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); } + + ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { @@ -372,7 +409,11 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue } void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) { - s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + if (binding.exchange.size()) + s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + //else its the default exchange and there is no need to replicate + //the binding, the creation of the queue will have done so + //automatically } void UpdateClient::updateOutputTask(const sys::OutputTask* task) { @@ -380,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); + QPID_LOG(debug, *this << " updating output task " << ci->getTag() << " channel=" << channel); } @@ -389,7 +430,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); - + // Send the management ID first on the main connection. std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); @@ -426,7 +467,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating session " << ss->getId()); - // Create a client session to update session state. + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); simpl->disableAutoDetach(); @@ -445,19 +486,19 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); + boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); - updateTxState(ss->getSemanticState()); // Tx transaction state. + updateTransactionState(ss->getSemanticState()); // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) + if (inProgress) --received; // Sync the session to ensure all responses from broker have been processed. shadowSession.sync(); - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -466,7 +507,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -479,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), + arg::destination = ci->getTag(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -493,29 +535,32 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), + ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getTag() << " on " << shadowSession.getId()); } - -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { + +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, + client::AsyncSession& updateSession) +{ + if (!dr.isEnded() && dr.isAcquired()) { + assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(shadowSession).deliveryRecord( + ClusterConnectionProxy(updateSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -536,10 +581,12 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); + void operator()(const broker::DtxAck& ack) { + std::for_each(ack.getPending().begin(), ack.getPending().end(), + boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); + proxy.dtxAck(); } - + void operator()(const broker::RecoveredDequeue& rdeq) { updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); @@ -554,13 +601,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } + typedef std::list<Queue::shared_ptr> QueueList; + + void copy(const QueueList& l, Array& a) { + for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) + a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + } + void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); + assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); proxy.txPublish(qarray, txPub.delivered); } @@ -569,18 +621,44 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { client::AsyncSession session; ClusterConnectionProxy proxy; }; - -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); + +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + +void UpdateClient::updateTransactionState(broker::SemanticState& s) { ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); + if (dtx) { + updateBufferRef(dtx, false); // Current transaction. + } else if (tx) { proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); + tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } +} + +void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { + ClusterConnectionProxy proxy(session); + proxy.dtxStart( + dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); + TxOpUpdater updater(*this, session, expiry); + dtx->accept(updater); + proxy.dtxEnd(); } void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { @@ -615,4 +693,35 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } +void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) +{ + q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); +} + +void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, + boost::shared_ptr<broker::QueueObserver> o) +{ + qpid::framing::FieldTable state; + broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (so) { + so->getState( state ); + std::string id(so->getId()); + QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); + ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); + } +} + +void UpdateClient::updateDtxManager() { + broker::DtxManager& dtm = updaterBroker.getDtxManager(); + dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); +} + +void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { + QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); + for (size_t i = 0; i < r.size(); ++i) + updateDtxBuffer(r[i]); + ClusterConnectionProxy(session).dtxWorkRecord( + r.getXid(), r.isPrepared(), r.getTimeout()); +} + }} // namespace qpid::cluster |