summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-08 23:52:35 +0000
committerAlan Conway <aconway@apache.org>2009-03-08 23:52:35 +0000
commitbab8070ad7989386b11f4106d9f15e73d9246c1d (patch)
treedfca93bc4ca56de921c01f87e966855816744484 /cpp/src/qpid/cluster/UpdateClient.cpp
parente14ed937f459d07735a5ed22636127fdf81dc88c (diff)
downloadqpid-python-bab8070ad7989386b11f4106d9f15e73d9246c1d.tar.gz
Fixed race conditions in cluster.
Execute all cluster logic in frameDeliverQueue thread, decoding only in eventDeliverQueue thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp11
1 files changed, 6 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index d5e6635c45..cf1633e40b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -22,6 +22,7 @@
#include "Cluster.h"
#include "ClusterMap.h"
#include "Connection.h"
+#include "Decoder.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -86,14 +87,14 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_,
- const Cluster::Connections& cons,
+ broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- eventId(eventId_), frameId(frameId_), connections(cons),
+ frameId(frameId_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -130,7 +131,6 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setEventId(eventId);
membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -232,7 +232,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
- std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
+ // Safe to use decoder here because we are stalled for update.
+ std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),