summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp56
1 files changed, 15 insertions, 41 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6cad003605..15332c2cac 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -128,15 +128,11 @@ Cluster::~Cluster() {
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
}
-bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
- assert(result);
- return result;
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ connections.insert(c->getId(), c);
}
void Cluster::erase(ConnectionId id) {
- Lock l(lock);
connections.erase(id);
}
@@ -226,29 +222,15 @@ void Cluster::leave(Lock&) {
}
}
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
- ConnectionMap::iterator i = connections.find(connectionId);
- if (i == connections.end()) {
- if (connectionId.getMember() == myId) { // Closed local connection
- QPID_LOG(debug, *this << " activity on closed connection: " << connectionId);
- return boost::intrusive_ptr<Connection>();
- }
- else { // New shadow connection
- std::ostringstream mgmtId;
- mgmtId << name << ":" << connectionId;
- ConnectionMap::value_type value(connectionId,
- new Connection(*this, shadowOut, mgmtId.str(), connectionId));
- i = connections.insert(value).first;
- }
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) {
+ boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
+ if (!cp && connectionId.getMember() != myId) { // New shadow connection
+ std::ostringstream mgmtId;
+ mgmtId << name << ":" << connectionId;
+ cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
+ connections.insert(connectionId, cp);
}
- return i->second;
-}
-
-Cluster::Connections Cluster::getConnections(Lock&) {
- Connections result(connections.size());
- std::transform(connections.begin(), connections.end(), result.begin(),
- boost::bind(&ConnectionMap::value_type::second, _1));
- return result;
+ return cp;
}
void Cluster::deliver(
@@ -299,7 +281,7 @@ void Cluster::delivered(const Event& e, Lock& l) {
QPID_LOG(trace, *this << " DROP: " << e);
}
else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
if (!connection) return;
if (e.getType() == CONTROL) {
while (frame.decode(buf)) {
@@ -479,7 +461,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
// FIXME aconway 2008-10-15: no longer need a separate control now
// that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) {
+void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
Url url(urlStr);
@@ -489,7 +471,7 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
deliverQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
- new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
+ new DumpClient(myId, dumpee, url, broker, map, connections.values(),
boost::bind(&Cluster::dumpOutDone, this),
boost::bind(&Cluster::dumpOutError, this, _1)));
}
@@ -587,16 +569,8 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_members(urlstr);
}
- //close connections belonging to members that have now been excluded
- for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) {
- MemberId member = i->first.getMember();
- if (member != myId && !map.isMember(member)) {
- i->second->left();
- connections.erase(i++);
- } else {
- i++;
- }
- }
+ // Close connections belonging to members that have now been excluded
+ connections.update(myId, map);
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {