diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 56 |
1 files changed, 15 insertions, 41 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6cad003605..15332c2cac 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -128,15 +128,11 @@ Cluster::~Cluster() { if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. } -bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); - bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second; - assert(result); - return result; +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + connections.insert(c->getId(), c); } void Cluster::erase(ConnectionId id) { - Lock l(lock); connections.erase(id); } @@ -226,29 +222,15 @@ void Cluster::leave(Lock&) { } } -boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { - ConnectionMap::iterator i = connections.find(connectionId); - if (i == connections.end()) { - if (connectionId.getMember() == myId) { // Closed local connection - QPID_LOG(debug, *this << " activity on closed connection: " << connectionId); - return boost::intrusive_ptr<Connection>(); - } - else { // New shadow connection - std::ostringstream mgmtId; - mgmtId << name << ":" << connectionId; - ConnectionMap::value_type value(connectionId, - new Connection(*this, shadowOut, mgmtId.str(), connectionId)); - i = connections.insert(value).first; - } +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) { + boost::intrusive_ptr<Connection> cp = connections.find(connectionId); + if (!cp && connectionId.getMember() != myId) { // New shadow connection + std::ostringstream mgmtId; + mgmtId << name << ":" << connectionId; + cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId); + connections.insert(connectionId, cp); } - return i->second; -} - -Cluster::Connections Cluster::getConnections(Lock&) { - Connections result(connections.size()); - std::transform(connections.begin(), connections.end(), result.begin(), - boost::bind(&ConnectionMap::value_type::second, _1)); - return result; + return cp; } void Cluster::deliver( @@ -299,7 +281,7 @@ void Cluster::delivered(const Event& e, Lock& l) { QPID_LOG(trace, *this << " DROP: " << e); } else { - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); if (!connection) return; if (e.getType() == CONTROL) { while (frame.decode(buf)) { @@ -479,7 +461,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& // FIXME aconway 2008-10-15: no longer need a separate control now // that the dump control is in the deliver queue. -void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { +void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); Url url(urlStr); @@ -489,7 +471,7 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& deliverQueue.stop(); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( - new DumpClient(myId, dumpee, url, broker, map, getConnections(l), + new DumpClient(myId, dumpee, url, broker, map, connections.values(), boost::bind(&Cluster::dumpOutDone, this), boost::bind(&Cluster::dumpOutError, this, _1))); } @@ -587,16 +569,8 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_members(urlstr); } - //close connections belonging to members that have now been excluded - for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) { - MemberId member = i->first.getMember(); - if (member != myId && !map.isMember(member)) { - i->second->left(); - connections.erase(i++); - } else { - i++; - } - } + // Close connections belonging to members that have now been excluded + connections.update(myId, map); } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { |
