summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-02-12 13:39:52 -0800
committerSage Weil <sage@inktank.com>2013-02-12 13:39:52 -0800
commit102a519632f1b7a0fede9a3fbd4a5c1df0e732a5 (patch)
treefb3b1f984299de76d46fdbfc894e8f94245b3a24
parent2c6afa058e8b1738c1400392320482945834de86 (diff)
parent2ebf4d065af3dc2e581a25b921071af3efb57f8a (diff)
downloadceph-102a519632f1b7a0fede9a3fbd4a5c1df0e732a5.tar.gz
Merge remote-tracking branch 'gh/wip-bobtail-osd-msgr' into bobtail
-rw-r--r--src/msg/Messenger.h4
-rw-r--r--src/msg/SimpleMessenger.cc4
-rw-r--r--src/msg/SimpleMessenger.h1
-rw-r--r--src/osd/OSD.cc93
-rw-r--r--src/osd/OSD.h4
5 files changed, 38 insertions, 68 deletions
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 6ece6536ad7..b75e4420f66 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -440,6 +440,10 @@ public:
*/
virtual Connection *get_connection(const entity_inst_t& dest) = 0;
/**
+ * Get the Connection object associated with ourselves.
+ */
+ virtual Connection *get_loopback_connection() = 0;
+ /**
* Send a "keepalive" ping to the given dest, if it has a working Connection.
* If the Messenger doesn't already have a Connection, or if the underlying
* connection has broken, this function does nothing.
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 0e8ed27c7f4..dd496947151 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -380,6 +380,10 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
}
}
+Connection *SimpleMessenger::get_loopback_connection()
+{
+ return (Connection*)local_connection->get();
+}
void SimpleMessenger::submit_message(Message *m, Connection *con,
const entity_addr_t& dest_addr, int dest_type, bool lazy)
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index fb392e8f741..cc946e3d25a 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -294,6 +294,7 @@ public:
* @return The requested Connection, as a pointer whose reference you own.
*/
virtual Connection *get_connection(const entity_inst_t& dest);
+ virtual Connection *get_loopback_connection();
/**
* Send a "keepalive" ping to the given dest, if it has a working Connection.
* If the Messenger doesn't already have a Connection, or if the underlying
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 4a0b4ef6a9b..661947b4fdf 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -3141,47 +3141,45 @@ void OSD::forget_peer_epoch(int peer, epoch_t as_of)
}
-bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
- Session* session)
+bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch, Session* session)
{
bool shared = false;
- dout(20) << "_share_map_incoming " << inst << " " << epoch << dendl;
+ dout(20) << "_share_map_incoming " << name << " " << con->get_peer_addr() << " " << epoch << dendl;
//assert(osd_lock.is_locked());
assert(is_active());
// does client have old map?
- if (inst.name.is_client()) {
+ if (name.is_client()) {
bool sendmap = epoch < osdmap->get_epoch();
if (sendmap && session) {
- if ( session->last_sent_epoch < osdmap->get_epoch() ) {
+ if (session->last_sent_epoch < osdmap->get_epoch()) {
session->last_sent_epoch = osdmap->get_epoch();
- }
- else {
+ } else {
sendmap = false; //we don't need to send it out again
- dout(15) << inst.name << " already sent incremental to update from epoch "<< epoch << dendl;
+ dout(15) << name << " already sent incremental to update from epoch "<< epoch << dendl;
}
}
if (sendmap) {
- dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
- send_incremental_map(epoch, inst);
+ dout(10) << name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
+ send_incremental_map(epoch, con);
shared = true;
}
}
// does peer have old map?
- if (inst.name.is_osd() &&
- osdmap->is_up(inst.name.num()) &&
- (osdmap->get_cluster_inst(inst.name.num()) == inst ||
- osdmap->get_hb_inst(inst.name.num()) == inst)) {
+ if (name.is_osd() &&
+ osdmap->is_up(name.num()) &&
+ (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+ osdmap->get_hb_addr(name.num()) == con->get_peer_addr())) {
// remember
- epoch_t has = note_peer_epoch(inst.name.num(), epoch);
+ epoch_t has = note_peer_epoch(name.num(), epoch);
// share?
if (has < osdmap->get_epoch()) {
- dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
- note_peer_epoch(inst.name.num(), osdmap->get_epoch());
- send_incremental_map(epoch, osdmap->get_cluster_inst(inst.name.num()));
+ dout(10) << name << " " << con->get_peer_addr() << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
+ note_peer_epoch(name.num(), osdmap->get_epoch());
+ send_incremental_map(epoch, con);
shared = true;
}
}
@@ -3229,6 +3227,14 @@ bool OSD::heartbeat_dispatch(Message *m)
handle_osd_ping((MOSDPing*)m);
break;
+ case CEPH_MSG_OSD_MAP:
+ {
+ Connection *self = cluster_messenger->get_loopback_connection();
+ cluster_messenger->send_message(m, self);
+ self->put();
+ }
+ break;
+
default:
return false;
}
@@ -4327,17 +4333,6 @@ MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to)
return m;
}
-void OSD::send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy)
-{
- Messenger *msgr = client_messenger;
- if (entity_name_t::TYPE_OSD == inst.name._type)
- msgr = cluster_messenger;
- if (lazy)
- msgr->lazy_send_message(m, inst); // only if we already have an open connection
- else
- msgr->send_message(m, inst);
-}
-
void OSD::send_map(MOSDMap *m, Connection *con)
{
Messenger *msgr = client_messenger;
@@ -4346,32 +4341,6 @@ void OSD::send_map(MOSDMap *m, Connection *con)
msgr->send_message(m, con);
}
-void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
-{
- dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
- << " to " << inst << dendl;
-
- if (since < superblock.oldest_map) {
- // just send latest full map
- MOSDMap *m = new MOSDMap(monc->get_fsid());
- m->oldest_map = superblock.oldest_map;
- m->newest_map = superblock.newest_map;
- epoch_t e = osdmap->get_epoch();
- get_map_bl(e, m->maps[e]);
- send_map(m, inst, lazy);
- return;
- }
-
- while (since < osdmap->get_epoch()) {
- epoch_t to = osdmap->get_epoch();
- if (to - since > (epoch_t)g_conf->osd_map_message_max)
- to = since + g_conf->osd_map_message_max;
- MOSDMap *m = build_incremental_map_msg(since, to);
- send_map(m, inst, lazy);
- since = to;
- }
-}
-
void OSD::send_incremental_map(epoch_t since, Connection *con)
{
dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
@@ -4538,13 +4507,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
if (!osdmap->have_inst(from) ||
osdmap->get_cluster_addr(from) != m->get_source_inst().addr) {
dout(0) << "from dead osd." << from << ", dropping, sharing map" << dendl;
- send_incremental_map(epoch, m->get_source_inst(), true);
-
- // close after we send the map; don't reconnect
- Connection *con = m->get_connection();
- cluster_messenger->mark_down_on_empty(con);
- cluster_messenger->mark_disposable(con);
-
+ send_incremental_map(epoch, m->get_connection());
return false;
}
}
@@ -5892,7 +5855,7 @@ void OSD::handle_op(OpRequestRef op)
return;
}
// share our map with sender, if they're old
- _share_map_incoming(m->get_source_inst(), m->get_map_epoch(),
+ _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(),
(Session *)m->get_connection()->get_priv());
int r = init_op_flags(m);
if (r) {
@@ -6007,7 +5970,7 @@ void OSD::handle_sub_op(OpRequestRef op)
return;
// share our map with sender, if they're old
- _share_map_incoming(m->get_source_inst(), m->map_epoch,
+ _share_map_incoming(m->get_source(), m->get_connection(), m->map_epoch,
(Session*)m->get_connection()->get_priv());
if (service.splitting(pgid)) {
@@ -6044,7 +6007,7 @@ void OSD::handle_sub_op_reply(OpRequestRef op)
if (!require_same_or_newer_map(op, m->get_map_epoch())) return;
// share our map with sender, if they're old
- _share_map_incoming(m->get_source_inst(), m->get_map_epoch(),
+ _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(),
(Session*)m->get_connection()->get_priv());
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index b36566d90e7..c3bc0b96839 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -786,7 +786,7 @@ private:
epoch_t note_peer_epoch(int p, epoch_t e);
void forget_peer_epoch(int p, epoch_t e);
- bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
+ bool _share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch,
Session *session = 0);
void _share_map_outgoing(int peer, Connection *con,
OSDMapRef map = OSDMapRef());
@@ -833,9 +833,7 @@ private:
}
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to);
- void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
void send_incremental_map(epoch_t since, Connection *con);
- void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy);
void send_map(MOSDMap *m, Connection *con);
protected: