diff options
| author | Alan Conway <aconway@apache.org> | 2009-03-08 23:52:35 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-03-08 23:52:35 +0000 |
| commit | bab8070ad7989386b11f4106d9f15e73d9246c1d (patch) | |
| tree | dfca93bc4ca56de921c01f87e966855816744484 /cpp/src/qpid/cluster/UpdateClient.cpp | |
| parent | e14ed937f459d07735a5ed22636127fdf81dc88c (diff) | |
| download | qpid-python-bab8070ad7989386b11f4106d9f15e73d9246c1d.tar.gz | |
Fixed race conditions in cluster.
Execute all cluster logic in frameDeliverQueue thread,
decoding only in eventDeliverQueue thread.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index d5e6635c45..cf1633e40b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -22,6 +22,7 @@ #include "Cluster.h" #include "ClusterMap.h" #include "Connection.h" +#include "Decoder.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" @@ -86,14 +87,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, - const Cluster::Connections& cons, + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - eventId(eventId_), frameId(frameId_), connections(cons), + frameId(frameId_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -130,7 +131,6 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); - membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -232,7 +232,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda connectionSettings.maxFrameSize = bc.getFrameMax(); shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); - std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment(); + // Safe to use decoder here because we are stalled for update. + std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), |
