diff options
author | Samuel Just <sam.just@inktank.com> | 2012-11-30 11:08:55 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2012-11-30 11:08:55 -0800 |
commit | 47699f39b9c185454ff86168e4a95b6e5280ae12 (patch) | |
tree | 7a1182384d165a96ce4a117caa73e4299d6f026d | |
parent | 0ffafb3106d6f2d7ac7a51720af709596bf50683 (diff) | |
download | ceph-47699f39b9c185454ff86168e4a95b6e5280ae12.tar.gz |
osd/: make OSDService messenger helpers return ConnectionRef
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/msg/DispatchQueue.h | 3 | ||||
-rw-r--r-- | src/msg/Message.h | 6 | ||||
-rw-r--r-- | src/osd/OSD.cc | 68 | ||||
-rw-r--r-- | src/osd/OSD.h | 4 | ||||
-rw-r--r-- | src/osd/PG.cc | 27 |
5 files changed, 54 insertions, 54 deletions
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index ad4584829d1..ea44c165d56 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -33,9 +33,6 @@ class SimpleMessenger; class Message; class Connection; -typedef boost::intrusive_ptr<Connection> ConnectionRef; -typedef boost::intrusive_ptr<Message> MessageRef; - /** * The DispatchQueue contains all the Pipes which have Messages * they want to be dispatched, carefully organized by Message priority diff --git a/src/msg/Message.h b/src/msg/Message.h index b6a113f771f..fc434ed9b85 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -18,6 +18,10 @@ #include <stdlib.h> #include <ostream> +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + #include "include/types.h" #include "include/buffer.h" #include "common/Throttle.h" @@ -261,6 +265,7 @@ public: rx_buffers.erase(tid); } }; +typedef boost::intrusive_ptr<Connection> ConnectionRef; @@ -466,6 +471,7 @@ public: void encode(uint64_t features, bool datacrc); }; +typedef boost::intrusive_ptr<Message> MessageRef; extern Message *decode_message(CephContext *cct, ceph_msg_header &header, ceph_msg_footer& footer, bufferlist& front, diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 9c57b66c4b8..feff33d4943 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1781,11 +1781,12 @@ void OSD::_add_heartbeat_peer(int p) map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p); if (i == heartbeat_peers.end()) { - Connection *con = service.get_con_osd_hb(p, osdmap->get_epoch()); + ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch()); if (!con) return; hi = &heartbeat_peers[p]; - hi->con = con; + hi->con = con.get(); + hi->con->get(); hi->peer = p; hi->con->set_priv(new HeartbeatSession(p)); dout(10) << "_add_heartbeat_peer: new peer osd." << p @@ -1913,10 +1914,9 @@ void OSD::handle_osd_ping(MOSDPing *m) if (curmap->is_up(from)) { note_peer_epoch(from, m->map_epoch); if (is_active()) { - Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch()); if (con) { - _share_map_outgoing(from, con); - con->put(); + _share_map_outgoing(from, con.get()); } } } @@ -1939,10 +1939,9 @@ void OSD::handle_osd_ping(MOSDPing *m) curmap->is_up(from)) { note_peer_epoch(from, m->map_epoch); if (is_active()) { - Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch()); if (con) { - _share_map_outgoing(from, con); - con->put(); + _share_map_outgoing(from, con.get()); } } } @@ -2087,14 +2086,15 @@ bool OSD::heartbeat_reset(Connection *con) map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer); if (p != heartbeat_peers.end() && p->second.con == con) { - Connection *newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); + ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); if (!newcon) { dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl; s->put(); return true; } dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl; - p->second.con = newcon; + p->second.con = newcon.get(); + p->second.con->get(); p->second.con->set_priv(s); } else { dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl; @@ -2510,7 +2510,7 @@ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epo osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer)); } -Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) +ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) { Mutex::Locker l(pre_publish_lock); @@ -2521,10 +2521,13 @@ Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) next_osdmap->get_info(peer).up_from > from_epoch) { return NULL; } - return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)); + ConnectionRef ret( + osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer))); + ret->put(); // Ref from get_connection + return ret; } -Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) +ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) { Mutex::Locker l(pre_publish_lock); @@ -2535,7 +2538,10 @@ Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) next_osdmap->get_info(peer).up_from > from_epoch) { return NULL; } - return osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)); + ConnectionRef ret( + osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer))); + ret->put(); // Ref from get_connection + return ret; } void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want) @@ -4688,16 +4694,16 @@ void OSD::do_notifies( } if (!curmap->is_up(it->first)) continue; - Connection *con = service.get_con_osd_cluster(it->first, curmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch()); if (!con) continue; - _share_map_outgoing(it->first, con, curmap); + _share_map_outgoing(it->first, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_notify osd." << it->first << " on " << it->second.size() << " PGs" << dendl; MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(), it->second); - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } else { dout(7) << "do_notify osd." << it->first << " sending seperate messages" << dendl; @@ -4709,10 +4715,9 @@ void OSD::do_notifies( list[0] = *i; MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent, list); - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } } - con->put(); } } @@ -4729,15 +4734,15 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map, if (!curmap->is_up(pit->first)) continue; int who = pit->first; - Connection *con = service.get_con_osd_cluster(who, curmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch()); if (!con) continue; - _share_map_outgoing(who, con, curmap); + _share_map_outgoing(who, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_queries querying osd." << who << " on " << pit->second.size() << " PGs" << dendl; MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second); - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } else { dout(7) << "do_queries querying osd." << who << " sending seperate messages " @@ -4748,10 +4753,9 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map, map<pg_t, pg_query_t> to_send; to_send.insert(*i); MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send); - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } } - con->put(); } } @@ -4769,14 +4773,14 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info ++i) { dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl; } - Connection *con = service.get_con_osd_cluster(p->first, curmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch()); if (!con) continue; - _share_map_outgoing(p->first, con, curmap); + _share_map_outgoing(p->first, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch()); m->pg_list = p->second; - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } else { for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i = p->second.begin(); @@ -4786,10 +4790,9 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info to_send[0] = *i; MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent); m->pg_list = to_send; - cluster_messenger->send_message(m, con); + cluster_messenger->send_message(m, con.get()); } } - con->put(); } info_map.clear(); } @@ -5158,13 +5161,12 @@ void OSD::handle_pg_query(OpRequestRef op) pg_info_t empty(pgid); if (it->second.type == pg_query_t::LOG || it->second.type == pg_query_t::FULLLOG) { - Connection *con = service.get_con_osd_cluster(from, osdmap->get_epoch()); + ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch()); if (con) { MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty, it->second.epoch_sent); - _share_map_outgoing(from, con, osdmap); - cluster_messenger->send_message(mlog, con); - con->put(); + _share_map_outgoing(from, con.get(), osdmap); + cluster_messenger->send_message(mlog, con.get()); } } else { notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent, diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 2b623efa339..a749ec178db 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -225,8 +225,8 @@ public: Mutex::Locker l(pre_publish_lock); next_osdmap = map; } - Connection *get_con_osd_cluster(int peer, epoch_t from_epoch); - Connection *get_con_osd_hb(int peer, epoch_t from_epoch); + ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); + ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch); void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); // -- scrub scheduling -- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index d71fe50a0e9..55348e6c786 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3383,7 +3383,7 @@ void PG::scrub() OSDMapRef curmap = osd->get_osdmap(); scrubber.is_chunky = true; for (unsigned i=1; i<acting.size(); i++) { - Connection *con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch()); + ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch()); if (!con) continue; if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) { @@ -3391,10 +3391,8 @@ void PG::scrub() << " does not support chunky scrubs, falling back to classic" << dendl; scrubber.is_chunky = false; - con->put(); break; } - con->put(); } if (scrubber.is_chunky) { @@ -4226,11 +4224,10 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; - Connection *con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch()); + ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch()); if (con) { - osd->osd->_share_map_outgoing(from, con, get_osdmap()); - osd->cluster_messenger->send_message(mlog, con); - con->put(); + osd->osd->_share_map_outgoing(from, con.get(), get_osdmap()); + osd->cluster_messenger->send_message(mlog, con.get()); } else { mlog->put(); } @@ -5248,7 +5245,8 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con context< RecoveryMachine >().log_enter(state_name); PG *pg = context< RecoveryMachine >().pg; pg->state_set(PG_STATE_BACKFILL_WAIT); - Connection *con = pg->osd->get_con_osd_cluster(pg->backfill_target, pg->get_osdmap()->get_epoch()); + ConnectionRef con = pg->osd->get_con_osd_cluster( + pg->backfill_target, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) { pg->osd->cluster_messenger->send_message( @@ -5256,11 +5254,10 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con MBackfillReserve::REQUEST, pg->info.pgid, pg->get_osdmap()->get_epoch()), - con); + con.get()); } else { post_event(RemoteBackfillReserved()); } - con->put(); } } @@ -5495,18 +5492,17 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con } if (acting_osd_it != context< Active >().sorted_acting_set.end()) { - Connection *con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch()); + ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { pg->osd->cluster_messenger->send_message( new MRecoveryReserve(MRecoveryReserve::REQUEST, pg->info.pgid, pg->get_osdmap()->get_epoch()), - con); + con.get()); } else { post_event(RemoteRecoveryReserved()); } - con->put(); } ++acting_osd_it; } else { @@ -5543,16 +5539,15 @@ void PG::RecoveryState::Recovering::release_reservations() ++i) { if (*i == pg->osd->whoami) // skip myself continue; - Connection *con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); + ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { pg->osd->cluster_messenger->send_message( new MRecoveryReserve(MRecoveryReserve::RELEASE, pg->info.pgid, pg->get_osdmap()->get_epoch()), - con); + con.get()); } - con->put(); } } } |