From bab8070ad7989386b11f4106d9f15e73d9246c1d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Sun, 8 Mar 2009 23:52:35 +0000 Subject: Fixed race conditions in cluster. Execute all cluster logic in frameDeliverQueue thread, decoding only in eventDeliverQueue thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751557 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index d5e6635c45..cf1633e40b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -22,6 +22,7 @@ #include "Cluster.h" #include "ClusterMap.h" #include "Connection.h" +#include "Decoder.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" @@ -86,14 +87,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, - const Cluster::Connections& cons, + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function& ok, const boost::function& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - eventId(eventId_), frameId(frameId_), connections(cons), + frameId(frameId_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -130,7 +131,6 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); - membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -232,7 +232,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr& upda connectionSettings.maxFrameSize = bc.getFrameMax(); shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); - std::pair fragment = updateConnection->getDecoder().getFragment(); + // Safe to use decoder here because we are stalled for update. + std::pair fragment = decoder.get(updateConnection->getId()).getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), -- cgit v1.2.1