summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-11-30 11:08:55 -0800
committerSamuel Just <sam.just@inktank.com>2012-11-30 11:08:55 -0800
commit47699f39b9c185454ff86168e4a95b6e5280ae12 (patch)
tree7a1182384d165a96ce4a117caa73e4299d6f026d
parent0ffafb3106d6f2d7ac7a51720af709596bf50683 (diff)
downloadceph-47699f39b9c185454ff86168e4a95b6e5280ae12.tar.gz
osd/: make OSDService messenger helpers return ConnectionRef
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/msg/DispatchQueue.h3
-rw-r--r--src/msg/Message.h6
-rw-r--r--src/osd/OSD.cc68
-rw-r--r--src/osd/OSD.h4
-rw-r--r--src/osd/PG.cc27
5 files changed, 54 insertions, 54 deletions
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
index ad4584829d1..ea44c165d56 100644
--- a/src/msg/DispatchQueue.h
+++ b/src/msg/DispatchQueue.h
@@ -33,9 +33,6 @@ class SimpleMessenger;
class Message;
class Connection;
-typedef boost::intrusive_ptr<Connection> ConnectionRef;
-typedef boost::intrusive_ptr<Message> MessageRef;
-
/**
* The DispatchQueue contains all the Pipes which have Messages
* they want to be dispatched, carefully organized by Message priority
diff --git a/src/msg/Message.h b/src/msg/Message.h
index b6a113f771f..fc434ed9b85 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -18,6 +18,10 @@
#include <stdlib.h>
#include <ostream>
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
#include "include/types.h"
#include "include/buffer.h"
#include "common/Throttle.h"
@@ -261,6 +265,7 @@ public:
rx_buffers.erase(tid);
}
};
+typedef boost::intrusive_ptr<Connection> ConnectionRef;
@@ -466,6 +471,7 @@ public:
void encode(uint64_t features, bool datacrc);
};
+typedef boost::intrusive_ptr<Message> MessageRef;
extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
ceph_msg_footer& footer, bufferlist& front,
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 9c57b66c4b8..feff33d4943 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1781,11 +1781,12 @@ void OSD::_add_heartbeat_peer(int p)
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
- Connection *con = service.get_con_osd_hb(p, osdmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
if (!con)
return;
hi = &heartbeat_peers[p];
- hi->con = con;
+ hi->con = con.get();
+ hi->con->get();
hi->peer = p;
hi->con->set_priv(new HeartbeatSession(p));
dout(10) << "_add_heartbeat_peer: new peer osd." << p
@@ -1913,10 +1914,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
if (is_active()) {
- Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con);
- con->put();
+ _share_map_outgoing(from, con.get());
}
}
}
@@ -1939,10 +1939,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
if (is_active()) {
- Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con);
- con->put();
+ _share_map_outgoing(from, con.get());
}
}
}
@@ -2087,14 +2086,15 @@ bool OSD::heartbeat_reset(Connection *con)
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
p->second.con == con) {
- Connection *newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
if (!newcon) {
dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
s->put();
return true;
}
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = newcon;
+ p->second.con = newcon.get();
+ p->second.con->get();
p->second.con->set_priv(s);
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
@@ -2510,7 +2510,7 @@ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epo
osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer));
}
-Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
@@ -2521,10 +2521,13 @@ Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
next_osdmap->get_info(peer).up_from > from_epoch) {
return NULL;
}
- return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
+ ConnectionRef ret(
+ osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
}
-Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
@@ -2535,7 +2538,10 @@ Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
next_osdmap->get_info(peer).up_from > from_epoch) {
return NULL;
}
- return osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer));
+ ConnectionRef ret(
+ osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
}
void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
@@ -4688,16 +4694,16 @@ void OSD::do_notifies(
}
if (!curmap->is_up(it->first))
continue;
- Connection *con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(it->first, con, curmap);
+ _share_map_outgoing(it->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_notify osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_notify osd." << it->first
<< " sending seperate messages" << dendl;
@@ -4709,10 +4715,9 @@ void OSD::do_notifies(
list[0] = *i;
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
list);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
}
@@ -4729,15 +4734,15 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
if (!curmap->is_up(pit->first))
continue;
int who = pit->first;
- Connection *con = service.get_con_osd_cluster(who, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(who, con, curmap);
+ _share_map_outgoing(who, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_queries querying osd." << who
<< " sending seperate messages "
@@ -4748,10 +4753,9 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
map<pg_t, pg_query_t> to_send;
to_send.insert(*i);
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
}
@@ -4769,14 +4773,14 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
++i) {
dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl;
}
- Connection *con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(p->first, con, curmap);
+ _share_map_outgoing(p->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
p->second.begin();
@@ -4786,10 +4790,9 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
to_send[0] = *i;
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
m->pg_list = to_send;
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
info_map.clear();
}
@@ -5158,13 +5161,12 @@ void OSD::handle_pg_query(OpRequestRef op)
pg_info_t empty(pgid);
if (it->second.type == pg_query_t::LOG ||
it->second.type == pg_query_t::FULLLOG) {
- Connection *con = service.get_con_osd_cluster(from, osdmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
if (con) {
MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
it->second.epoch_sent);
- _share_map_outgoing(from, con, osdmap);
- cluster_messenger->send_message(mlog, con);
- con->put();
+ _share_map_outgoing(from, con.get(), osdmap);
+ cluster_messenger->send_message(mlog, con.get());
}
} else {
notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 2b623efa339..a749ec178db 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -225,8 +225,8 @@ public:
Mutex::Locker l(pre_publish_lock);
next_osdmap = map;
}
- Connection *get_con_osd_cluster(int peer, epoch_t from_epoch);
- Connection *get_con_osd_hb(int peer, epoch_t from_epoch);
+ ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+ ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
// -- scrub scheduling --
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index d71fe50a0e9..55348e6c786 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -3383,7 +3383,7 @@ void PG::scrub()
OSDMapRef curmap = osd->get_osdmap();
scrubber.is_chunky = true;
for (unsigned i=1; i<acting.size(); i++) {
- Connection *con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
+ ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
if (!con)
continue;
if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
@@ -3391,10 +3391,8 @@ void PG::scrub()
<< " does not support chunky scrubs, falling back to classic"
<< dendl;
scrubber.is_chunky = false;
- con->put();
break;
}
- con->put();
}
if (scrubber.is_chunky) {
@@ -4226,11 +4224,10 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
- Connection *con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
+ ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
if (con) {
- osd->osd->_share_map_outgoing(from, con, get_osdmap());
- osd->cluster_messenger->send_message(mlog, con);
- con->put();
+ osd->osd->_share_map_outgoing(from, con.get(), get_osdmap());
+ osd->cluster_messenger->send_message(mlog, con.get());
} else {
mlog->put();
}
@@ -5248,7 +5245,8 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->state_set(PG_STATE_BACKFILL_WAIT);
- Connection *con = pg->osd->get_con_osd_cluster(pg->backfill_target, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(
+ pg->backfill_target, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
@@ -5256,11 +5254,10 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
MBackfillReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
} else {
post_event(RemoteBackfillReserved());
}
- con->put();
}
}
@@ -5495,18 +5492,17 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
}
if (acting_osd_it != context< Active >().sorted_acting_set.end()) {
- Connection *con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
new MRecoveryReserve(MRecoveryReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
} else {
post_event(RemoteRecoveryReserved());
}
- con->put();
}
++acting_osd_it;
} else {
@@ -5543,16 +5539,15 @@ void PG::RecoveryState::Recovering::release_reservations()
++i) {
if (*i == pg->osd->whoami) // skip myself
continue;
- Connection *con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
new MRecoveryReserve(MRecoveryReserve::RELEASE,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
}
- con->put();
}
}
}