diff options
author | Sage Weil <sage@newdream.net> | 2013-03-18 15:53:56 -0700 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2013-03-18 15:53:56 -0700 |
commit | 7e7a19ba18cf36a9d1002dad6277fe27512f2ec7 (patch) | |
tree | c87d3d8e540587e298de49cdc9e380ddb4e1501f | |
parent | a2ac9358a32f76f9fa4f922c056d7f8dabe52869 (diff) | |
parent | cecfe411bba37983eabee3da06ab930c8b025358 (diff) | |
download | ceph-7e7a19ba18cf36a9d1002dad6277fe27512f2ec7.tar.gz |
Merge pull request #115 from ceph/wip-4199
Resolves #4199
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/Makefile.am | 9 | ||||
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/global/signal_handler.cc | 6 | ||||
-rw-r--r-- | src/global/signal_handler.h | 3 | ||||
-rw-r--r-- | src/messages/MMonHealth.h | 108 | ||||
-rw-r--r-- | src/messages/MMonQuorumService.h | 72 | ||||
-rw-r--r-- | src/mon/DataHealthService.cc | 211 | ||||
-rw-r--r-- | src/mon/DataHealthService.h | 88 | ||||
-rw-r--r-- | src/mon/HealthMonitor.cc | 95 | ||||
-rw-r--r-- | src/mon/HealthMonitor.h | 87 | ||||
-rw-r--r-- | src/mon/HealthService.h | 52 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 17 | ||||
-rw-r--r-- | src/mon/Monitor.h | 29 | ||||
-rw-r--r-- | src/mon/QuorumService.h | 142 | ||||
-rw-r--r-- | src/mon/mon_types.h | 34 | ||||
-rw-r--r-- | src/msg/Message.cc | 5 | ||||
-rw-r--r-- | src/msg/Message.h | 1 |
17 files changed, 961 insertions, 1 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index b00fe03b0c5..f8593e5bf3f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1364,7 +1364,9 @@ libmon_a_SOURCES = \ mon/AuthMonitor.cc \ mon/Elector.cc \ mon/MonitorStore.cc \ - os/LevelDBStore.cc + os/LevelDBStore.cc \ + mon/HealthMonitor.cc \ + mon/DataHealthService.cc libmon_a_CXXFLAGS= ${AM_CXXFLAGS} noinst_LIBRARIES += libmon.a @@ -1826,6 +1828,7 @@ noinst_HEADERS = \ messages/MOSDPGScan.h\ messages/MBackfillReserve.h\ messages/MRecoveryReserve.h\ + messages/MMonQuorumService.h\ messages/MOSDPGTemp.h\ messages/MOSDPGTrim.h\ messages/MOSDPing.h\ @@ -1847,14 +1850,17 @@ noinst_HEADERS = \ messages/MWatchNotify.h\ messages/PaxosServiceMessage.h\ mon/AuthMonitor.h\ + mon/DataHealthMonitor.h\ mon/Elector.h\ mon/LogMonitor.h\ + mon/HealthMonitor.h\ mon/MDSMonitor.h\ mon/MonmapMonitor.h\ mon/MonCaps.h\ mon/MonClient.h\ mon/MonMap.h\ mon/Monitor.h\ + mon/MonitorHealthService.h\ mon/MonitorStore.h\ mon/MonitorDBStore.h\ mon/OSDMonitor.h\ @@ -1862,6 +1868,7 @@ noinst_HEADERS = \ mon/PGMonitor.h\ mon/Paxos.h\ mon/PaxosService.h\ + mon/QuorumService.h\ mon/Session.h\ mon/mon_types.h\ mount/canonicalize.c\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index e99688342b7..9e832be2db8 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -162,6 +162,9 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages OPTION(mon_client_bytes, OPT_U64, 100ul << 20) // client msg data allowed in memory (in bytes) OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20) // mds, osd message memory cap (in bytes) OPTION(mon_max_log_entries_per_event, OPT_INT, 4096) +OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0) +OPTION(mon_data_avail_crit, OPT_INT, 5) +OPTION(mon_data_avail_warn, OPT_INT, 30) OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0) OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0) OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0) diff --git a/src/global/signal_handler.cc b/src/global/signal_handler.cc index 2a6260da66d..25f1a0a1992 100644 --- a/src/global/signal_handler.cc +++ b/src/global/signal_handler.cc @@ -325,6 +325,12 @@ void shutdown_async_signal_handler() g_signal_handler = NULL; } +void queue_async_signal(int signum) +{ + assert(g_signal_handler); + g_signal_handler->queue_signal(signum); +} + void register_async_signal_handler(int signum, signal_handler_t handler) { assert(g_signal_handler); diff --git a/src/global/signal_handler.h b/src/global/signal_handler.h index 8acfaed1a4c..3a11f54315e 100644 --- a/src/global/signal_handler.h +++ b/src/global/signal_handler.h @@ -35,6 +35,9 @@ void init_async_signal_handler(); /// shutdown async signal handler framework void shutdown_async_signal_handler(); +/// queue an async signal +void queue_async_signal(int signum); + /// install a safe, async, callback for the given signal void register_async_signal_handler(int signum, signal_handler_t handler); void register_async_signal_handler_oneshot(int signum, signal_handler_t handler); diff --git a/src/messages/MMonHealth.h b/src/messages/MMonHealth.h new file mode 100644 index 00000000000..2ef3cef519c --- /dev/null +++ b/src/messages/MMonHealth.h @@ -0,0 +1,108 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2012 Inktank, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_MMON_HEALTH_H +#define CEPH_MMON_HEALTH_H + +#include "msg/Message.h" +#include "messages/MMonQuorumService.h" +#include "mon/mon_types.h" + +struct MMonHealth : public MMonQuorumService +{ + static const int HEAD_VERSION = 1; + + enum { + OP_TELL = 1, + }; + + static const uint32_t FLAG_DATA = 0x01; + + int service_type; + int service_op; + uint32_t flags; + + // service specific data + DataStats data_stats; + + MMonHealth() : MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION) { } + MMonHealth(uint32_t type) : + MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION), + service_type(type) + { } + MMonHealth(uint32_t type, int op) : + MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION), + service_type(type), + service_op(op) + { } + +private: + ~MMonHealth() { } + +public: + const char *get_type_name() const { return "mon_health"; } + const char *get_service_op_name() const { + switch (service_op) { + case OP_TELL: return "tell"; + } + return "???"; + } + void print(ostream &o) const { + o << "mon_health( service " << get_service_type() + << " op " << get_service_op_name() + << " e " << get_epoch() << " r " << get_round() + << " flags"; + if (!flags) { + o << " none"; + } else { + if (has_flag(FLAG_DATA)) { + o << " data"; + } + } + o << " )"; + } + + int get_service_type() const { + return service_type; + } + + int get_service_op() { + return service_op; + } + + void set_flag(uint32_t f) { + flags |= f; + } + + bool has_flag(uint32_t f) const { + return (flags & f); + } + + void decode_payload() { + bufferlist::iterator p = payload.begin(); + service_decode(p); + ::decode(service_type, p); + ::decode(service_op, p); + ::decode(data_stats, p); + } + + void encode_payload(uint64_t features) { + service_encode(); + ::encode(service_type, payload); + ::encode(service_op, payload); + ::encode(data_stats, payload); + } + +}; + +#endif /* CEPH_MMON_HEALTH_H */ diff --git a/src/messages/MMonQuorumService.h b/src/messages/MMonQuorumService.h new file mode 100644 index 00000000000..6a25ae1e203 --- /dev/null +++ b/src/messages/MMonQuorumService.h @@ -0,0 +1,72 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2012 Inktank, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_MMON_QUORUM_SERVICE_H +#define CEPH_MMON_QUORUM_SERVICE_H + +#include "msg/Message.h" + +struct MMonQuorumService : public Message +{ + epoch_t epoch; + version_t round; + + MMonQuorumService(int type, int head=1, int compat=1) : + Message(type, head, compat), + epoch(0), + round(0) + { } + +protected: + ~MMonQuorumService() { } + +public: + + void set_epoch(epoch_t e) { + epoch = e; + } + + void set_round(version_t r) { + round = r; + } + + epoch_t get_epoch() const { + return epoch; + } + + version_t get_round() const { + return round; + } + + void service_encode() { + ::encode(epoch, payload); + ::encode(round, payload); + } + + void service_decode(bufferlist::iterator &p) { + ::decode(epoch, p); + ::decode(round, p); + } + + void encode_payload(uint64_t features) { + assert(0 == "MMonQuorumService message must always be a base class"); + } + + void decode_payload() { + assert(0 == "MMonQuorumService message must always be a base class"); + } + + const char *get_type_name() const { return "quorum_service"; } +}; + +#endif /* CEPH_MMON_QUORUM_SERVICE_H */ diff --git a/src/mon/DataHealthService.cc b/src/mon/DataHealthService.cc new file mode 100644 index 00000000000..d97b458ec03 --- /dev/null +++ b/src/mon/DataHealthService.cc @@ -0,0 +1,211 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include <memory> +#include <tr1/memory> +#include <errno.h> +#include <map> +#include <list> +#include <string> +#include <sstream> +#include <sys/vfs.h> + +#include "messages/MMonHealth.h" +#include "include/types.h" +#include "include/Context.h" +#include "include/assert.h" +#include "common/Formatter.h" + +#include "mon/Monitor.h" +#include "mon/QuorumService.h" +#include "mon/DataHealthService.h" + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, mon, this) +static ostream& _prefix(std::ostream *_dout, const Monitor *mon, + const DataHealthService *svc) { + assert(mon != NULL); + assert(svc != NULL); + return *_dout << "mon." << mon->name << "@" << mon->rank + << "(" << mon->get_state_name() << ")." << svc->get_name() + << "(" << svc->get_epoch() << ") "; +} + +void DataHealthService::start_epoch() +{ + dout(10) << __func__ << " epoch " << get_epoch() << dendl; + // we are not bound by election epochs, but we should clear the stats + // everytime an election is triggerd. As far as we know, a monitor might + // have been running out of disk space and someone fixed it. We don't want + // to hold the cluster back, even confusing the user, due to some possibly + // outdated stats. + stats.clear(); +} + +void DataHealthService::get_health(Formatter *f, + list<pair<health_status_t,string> > *detail) +{ + dout(10) << __func__ << dendl; + assert(f != NULL); + + f->open_object_section("data_health"); + f->open_array_section("mons"); + + for (map<entity_inst_t,DataStats>::iterator it = stats.begin(); + it != stats.end(); ++it) { + string mon_name = mon->monmap->get_name(it->first.addr); + DataStats& stats = it->second; + + health_status_t health_status = HEALTH_OK; + string health_detail; + if (stats.latest_avail_percent <= g_conf->mon_data_avail_crit) { + health_status = HEALTH_ERR; + health_detail = "shutdown iminent!"; + } else if (stats.latest_avail_percent <= g_conf->mon_data_avail_warn) { + health_status = HEALTH_WARN; + health_detail = "low disk space!"; + } + + if (detail && health_status != HEALTH_OK) { + stringstream ss; + ss << "mon." << mon_name << " addr " << it->first.addr + << " has " << stats.latest_avail_percent + << "\% avail disk space -- " << health_detail; + detail->push_back(make_pair(health_status, ss.str())); + } + + f->open_object_section(mon_name.c_str()); + f->dump_string("name", mon_name.c_str()); + f->dump_int("kb_total", stats.kb_total); + f->dump_int("kb_used", stats.kb_used); + f->dump_int("kb_avail", stats.kb_avail); + f->dump_int("avail_percent", stats.latest_avail_percent); + f->dump_stream("last_updated") << stats.last_update; + f->dump_stream("health") << health_status; + if (health_status != HEALTH_OK) + f->dump_string("health_detail", health_detail); + f->close_section(); + } + + f->close_section(); // mons + f->close_section(); // data_health +} + +void DataHealthService::update_stats() +{ + struct statfs stbuf; + int err = ::statfs(g_conf->mon_data.c_str(), &stbuf); + if (err < 0) { + assert(errno != EIO); + mon->clog.error() << __func__ << " statfs error: " << cpp_strerror(errno) << "\n"; + return; + } + + entity_inst_t our_inst = mon->monmap->get_inst(mon->name); + DataStats& ours = stats[our_inst]; + + ours.kb_total = stbuf.f_blocks * stbuf.f_bsize / 1024; + ours.kb_used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize / 1024; + ours.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024; + ours.latest_avail_percent = (((float)ours.kb_avail/ours.kb_total)*100); + dout(0) << __func__ << " avail " << ours.latest_avail_percent << "%" + << " total " << ours.kb_total << " used " << ours.kb_used << " avail " << ours.kb_avail + << dendl; + ours.last_update = ceph_clock_now(g_ceph_context); +} + +void DataHealthService::share_stats() +{ + dout(10) << __func__ << dendl; + if (!in_quorum()) + return; + + assert(!stats.empty()); + entity_inst_t our_inst = mon->monmap->get_inst(mon->rank); + assert(stats.count(our_inst) > 0); + DataStats &ours = stats[our_inst]; + const set<int>& quorum = mon->get_quorum(); + for (set<int>::const_iterator it = quorum.begin(); + it != quorum.end(); ++it) { + if (mon->monmap->get_name(*it) == mon->name) + continue; + entity_inst_t inst = mon->monmap->get_inst(*it); + MMonHealth *m = new MMonHealth(HealthService::SERVICE_HEALTH_DATA, + MMonHealth::OP_TELL); + m->data_stats = ours; + dout(20) << __func__ << " send " << *m << " to " << inst << dendl; + mon->messenger->send_message(m, inst); + } +} + +void DataHealthService::service_tick() +{ + dout(10) << __func__ << dendl; + + update_stats(); + if (in_quorum()) + share_stats(); + + DataStats &ours = stats[mon->monmap->get_inst(mon->name)]; + + if (ours.latest_avail_percent <= g_conf->mon_data_avail_crit) { + mon->clog.error() + << "reached critical levels of available space on data store" + << " -- shutdown!\n"; + force_shutdown(); + return; + } + + // we must backoff these warnings, and track how much data is being + // consumed in-between reports to assess if it's worth to log this info, + // otherwise we may very well contribute to the consumption of the + // already low available disk space. + if (ours.latest_avail_percent <= g_conf->mon_data_avail_warn) { + mon->clog.warn() + << "reached concerning levels of available space on data store" + << " (" << ours.latest_avail_percent << "\% free)\n"; + } +} + +void DataHealthService::handle_tell(MMonHealth *m) +{ + dout(10) << __func__ << " " << *m << dendl; + assert(m->get_service_op() == MMonHealth::OP_TELL); + + stats[m->get_source_inst()] = m->data_stats; +} + +bool DataHealthService::service_dispatch(MMonHealth *m) +{ + dout(10) << __func__ << " " << *m << dendl; + assert(m->get_service_type() == get_type()); + if (!in_quorum()) { + dout(1) << __func__ << " not in quorum -- drop message" << dendl; + m->put(); + return false; + } + + switch (m->service_op) { + case MMonHealth::OP_TELL: + // someone is telling us their stats + handle_tell(m); + break; + default: + dout(0) << __func__ << " unknown op " << m->service_op << dendl; + assert(0 == "Unknown service op"); + break; + } + m->put(); + return true; +} diff --git a/src/mon/DataHealthService.h b/src/mon/DataHealthService.h new file mode 100644 index 00000000000..d54af792d9c --- /dev/null +++ b/src/mon/DataHealthService.h @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_MON_DATA_HEALTH_SERVICE_H +#define CEPH_MON_DATA_HEALTH_SERVICE_H + +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" +#include <errno.h> + +#include "include/types.h" +#include "include/Context.h" +#include "mon/mon_types.h" +#include "mon/QuorumService.h" +#include "mon/HealthService.h" +#include "common/Formatter.h" +#include "common/config.h" +#include "global/signal_handler.h" + +class MMonHealth; + +class DataHealthService : + public HealthService +{ + map<entity_inst_t,DataStats> stats; + void handle_tell(MMonHealth *m); + void update_stats(); + void share_stats(); + + void force_shutdown() { + generic_dout(0) << "** Shutdown via Data Health Service **" << dendl; + queue_async_signal(SIGINT); + } + +protected: + virtual void service_tick(); + virtual bool service_dispatch(Message *m) { + assert(0 == "We should never reach this; only the function below"); + return false; + } + virtual bool service_dispatch(MMonHealth *m); + virtual void service_shutdown() { } + + virtual void start_epoch(); + virtual void finish_epoch() { } + virtual void cleanup() { } + +public: + DataHealthService(Monitor *m) : + HealthService(m) + { + set_update_period(g_conf->mon_health_data_update_interval); + } + virtual ~DataHealthService() { } + DataHealthService *get() { + return static_cast<DataHealthService *>(RefCountedObject::get()); + } + + virtual void init() { + generic_dout(20) << "data_health " << __func__ << dendl; + start_tick(); + } + + virtual void get_health(Formatter *f, + list<pair<health_status_t,string> > *detail); + + virtual int get_type() { + return HealthService::SERVICE_HEALTH_DATA; + } + + virtual string get_name() const { + return "data_health"; + } +}; +typedef boost::intrusive_ptr<DataHealthService> DataHealthServiceRef; + +#endif /* CEPH_MON_DATA_HEALTH_SERVICE_H */ diff --git a/src/mon/HealthMonitor.cc b/src/mon/HealthMonitor.cc new file mode 100644 index 00000000000..cf3d962b2c4 --- /dev/null +++ b/src/mon/HealthMonitor.cc @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <sstream> +#include <stdlib.h> +#include <limits.h> + +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + +#include "mon/Monitor.h" +#include "mon/QuorumService.h" +#include "mon/HealthService.h" +#include "mon/HealthMonitor.h" +#include "mon/DataHealthService.h" + +#include "messages/MMonHealth.h" + +#include "common/config.h" + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, mon, this) +static ostream& _prefix(std::ostream *_dout, const Monitor *mon, + const HealthMonitor *hmon) { + return *_dout << "mon." << mon->name << "@" << mon->rank + << "(" << mon->get_state_name() << ")." << hmon->get_name() + << "(" << hmon->get_epoch() << ") "; +} + +void HealthMonitor::init() +{ + dout(10) << __func__ << dendl; + assert(services.empty()); + services[HealthService::SERVICE_HEALTH_DATA] = + HealthServiceRef(new DataHealthService(mon)); + + for (map<int,HealthServiceRef>::iterator it = services.begin(); + it != services.end(); ++it) { + it->second->init(); + } +} + +bool HealthMonitor::service_dispatch(Message *m) +{ + assert(m->get_type() == MSG_MON_HEALTH); + MMonHealth *hm = (MMonHealth*)m; + int service_type = hm->get_service_type(); + if (services.count(service_type) == 0) { + dout(1) << __func__ << " service type " << service_type + << " not registered -- drop message!" << dendl; + m->put(); + return false; + } + return services[service_type]->service_dispatch(hm); +} + +void HealthMonitor::service_shutdown() +{ + dout(0) << "HealthMonitor::service_shutdown " + << services.size() << " services" << dendl; + for (map<int,HealthServiceRef>::iterator it = services.begin(); + it != services.end(); ++it) { + it->second->shutdown(); + } + services.clear(); +} + +void HealthMonitor::get_health(Formatter *f, + list<pair<health_status_t,string> > *detail) { + assert(f != NULL); + f->open_object_section("health"); + f->open_array_section("health_services"); + + for (map<int,HealthServiceRef>::iterator it = services.begin(); + it != services.end(); ++it) { + it->second->get_health(f, detail); + } + + f->close_section(); // health_services + f->close_section(); // health +} + diff --git a/src/mon/HealthMonitor.h b/src/mon/HealthMonitor.h new file mode 100644 index 00000000000..85581edde9b --- /dev/null +++ b/src/mon/HealthMonitor.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_HEALTH_MONITOR_H +#define CEPH_HEALTH_MONITOR_H + +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + +#include "mon/Monitor.h" +#include "mon/QuorumService.h" +#include "mon/HealthService.h" + +#include "messages/MMonHealth.h" + +#include "common/config.h" +#include "common/Formatter.h" + +class HealthMonitor : public QuorumService +{ + map<int,HealthServiceRef> services; + +protected: + virtual void service_shutdown(); + +public: + HealthMonitor(Monitor *m) : QuorumService(m) { } + virtual ~HealthMonitor() { } + HealthMonitor *get() { + return static_cast<HealthMonitor *>(RefCountedObject::get()); + } + + + /** + * @defgroup HealthMonitor_Inherited_h Inherited abstract methods + * @{ + */ + virtual void init(); + virtual void get_health(Formatter *f, + list<pair<health_status_t,string> > *detail); + virtual bool service_dispatch(Message *m); + + virtual void start_epoch() { + for (map<int,HealthServiceRef>::iterator it = services.begin(); + it != services.end(); ++it) { + it->second->start(get_epoch()); + } + } + + virtual void finish_epoch() { + generic_dout(20) << "HealthMonitor::finish_epoch()" << dendl; + for (map<int,HealthServiceRef>::iterator it = services.begin(); + it != services.end(); ++it) { + assert(it->second.get() != NULL); + it->second->finish(); + } + } + + virtual void cleanup() { } + virtual void service_tick() { } + + virtual int get_type() { + return QuorumService::SERVICE_HEALTH; + } + + virtual string get_name() const { + return "health"; + } + + /** + * @} // HealthMonitor_Inherited_h + */ +}; +typedef boost::intrusive_ptr<HealthMonitor> HealthMonitorRef; + +#endif // CEPH_HEALTH_MONITOR_H diff --git a/src/mon/HealthService.h b/src/mon/HealthService.h new file mode 100644 index 00000000000..a4de07a1a63 --- /dev/null +++ b/src/mon/HealthService.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_MON_HEALTH_SERVICE_H +#define CEPH_MON_HEALTH_SERVICE_H + +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + +#include "mon/Monitor.h" +#include "mon/QuorumService.h" + +#include "messages/MMonHealth.h" + +#include "common/config.h" + +struct HealthService : public QuorumService +{ + static const int SERVICE_HEALTH_DATA = 0x01; + + HealthService(Monitor *m) : QuorumService(m) { } + virtual ~HealthService() { } + + virtual bool service_dispatch(Message *m) { + return service_dispatch(static_cast<MMonHealth*>(m)); + } + + virtual bool service_dispatch(MMonHealth *m) = 0; + +public: + HealthService *get() { + return static_cast<HealthService *>(RefCountedObject::get()); + } + virtual void get_health(Formatter *f, + list<pair<health_status_t,string> > *detail) = 0; + virtual int get_type() = 0; + virtual string get_name() const = 0; +}; +typedef boost::intrusive_ptr<HealthService> HealthServiceRef; + +#endif // CEPH_MON_HEALTH_SERVICE_H diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 769381ba499..9f35e0df615 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -49,6 +49,7 @@ #include "messages/MAuthReply.h" #include "messages/MTimeCheck.h" +#include "messages/MMonHealth.h" #include "common/strtol.h" #include "common/ceph_argparse.h" @@ -68,6 +69,8 @@ #include "PGMonitor.h" #include "LogMonitor.h" #include "AuthMonitor.h" +#include "mon/QuorumService.h" +#include "mon/HealthMonitor.h" #include "auth/AuthMethodList.h" #include "auth/KeyRing.h" @@ -163,6 +166,8 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm"); paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth"); + health_monitor = QuorumServiceRef(new HealthMonitor(this)); + mon_caps = new MonCaps(); mon_caps->set_allow_all(true); mon_caps->text = "allow *"; @@ -418,6 +423,7 @@ int Monitor::preinit() } init_paxos(); + health_monitor->init(); // we need to bootstrap authentication keys so we can form an // initial quorum. @@ -565,6 +571,7 @@ void Monitor::shutdown() // clean up for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->shutdown(); + health_monitor->shutdown(); finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); @@ -683,6 +690,7 @@ void Monitor::reset() for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->restart(); + health_monitor->finish(); } set<string> Monitor::get_sync_targets_names() { @@ -1964,6 +1972,7 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features) paxos->leader_init(); for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->election_finished(); + health_monitor->start(epoch); finish_election(); if (monmap->size() > 1) @@ -1997,6 +2006,7 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features paxos->peon_init(); for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->election_finished(); + health_monitor->start(epoch); finish_election(); } @@ -2283,6 +2293,9 @@ void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f) if (f) f->close_section(); + if (f) + health_monitor->get_health(f, (detailbl ? &detail : NULL)); + stringstream fss; fss << overall; status = fss.str() + ss.str(); @@ -3174,6 +3187,10 @@ bool Monitor::_ms_dispatch(Message *m) handle_timecheck(static_cast<MTimeCheck *>(m)); break; + case MSG_MON_HEALTH: + health_monitor->dispatch(static_cast<MMonHealth *>(m)); + break; + default: ret = false; } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 9d2bd598823..55f27c7822d 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -51,6 +51,9 @@ #include <memory> #include <tr1/memory> #include <errno.h> +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" #define CEPH_MON_PROTOCOL 10 /* cluster internal */ @@ -83,6 +86,7 @@ enum { l_cluster_last, }; +class QuorumService; class PaxosService; class PerfCounters; @@ -97,6 +101,7 @@ class MAuthRotating; class MRoute; class MForward; class MTimeCheck; +class MMonHealth; #define COMPAT_SET_LOC "feature_set" @@ -1135,7 +1140,30 @@ private: /** * @} */ + /** + * @defgroup Monitor_h_stats Keep track of monitor statistics + * @{ + */ + struct MonStatsEntry { + // data dir + uint64_t kb_total; + uint64_t kb_used; + uint64_t kb_avail; + unsigned int latest_avail_ratio; + utime_t last_update; + }; + + struct MonStats { + MonStatsEntry ours; + map<entity_inst_t,MonStatsEntry> others; + }; + + MonStats stats; + void stats_update(); + /** + * @} + */ Context *probe_timeout_event; // for probing @@ -1215,6 +1243,7 @@ public: friend class PGMonitor; friend class LogMonitor; + boost::intrusive_ptr<QuorumService> health_monitor; // -- sessions -- MonSessionMap session_map; diff --git a/src/mon/QuorumService.h b/src/mon/QuorumService.h new file mode 100644 index 00000000000..054fce67036 --- /dev/null +++ b/src/mon/QuorumService.h @@ -0,0 +1,142 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_MON_QUORUM_SERVICE_H +#define CEPH_MON_QUORUM_SERVICE_H + +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + +#include <errno.h> + +#include "include/types.h" +#include "include/Context.h" +#include "common/RefCountedObj.h" +#include "common/config.h" + +#include "mon/Monitor.h" +#include "messages/MMonQuorumService.h" + +class QuorumService : public RefCountedObject +{ + uint32_t flags; + Context *tick_event; + double tick_period; + + struct C_Tick : public Context { + boost::intrusive_ptr<QuorumService> s; + C_Tick(boost::intrusive_ptr<QuorumService> qs) : s(qs) { } + void finish(int r) { + if (r < 0) + return; + s->tick(); + } + }; + +public: + static const int SERVICE_HEALTH = 0x01; + static const int SERVICE_TIMECHECK = 0x02; + +protected: + Monitor *mon; + epoch_t epoch; + + QuorumService(Monitor *m) : + tick_event(NULL), + tick_period(g_conf->mon_tick_interval), + mon(m), + epoch(0) + { + } + + void cancel_tick() { + if (tick_event) + mon->timer.cancel_event(tick_event); + tick_event = NULL; + } + + void start_tick() { + generic_dout(10) << __func__ << dendl; + + cancel_tick(); + if (tick_period <= 0) + return; + + tick_event = new C_Tick( + boost::intrusive_ptr<QuorumService>(this)); + mon->timer.add_event_after(tick_period, tick_event); + } + + void set_update_period(double t) { + tick_period = t; + } + + bool in_quorum() { + return (mon->is_leader() || mon->is_peon()); + } + + virtual bool service_dispatch(Message *m) = 0; + virtual void service_tick() = 0; + virtual void service_shutdown() = 0; + + virtual void start_epoch() = 0; + virtual void finish_epoch() = 0; + virtual void cleanup() = 0; + +public: + virtual ~QuorumService() { } + QuorumService *get() { + return static_cast<QuorumService *>(RefCountedObject::get()); + } + + void start(epoch_t new_epoch) { + epoch = new_epoch; + start_epoch(); + } + + void finish() { + generic_dout(20) << "QuorumService::finish" << dendl; + finish_epoch(); + } + + epoch_t get_epoch() const { + return epoch; + } + + bool dispatch(MMonQuorumService *m) { + return service_dispatch(m); + } + + void tick() { + service_tick(); + start_tick(); + } + + void shutdown() { + generic_dout(0) << "quorum service shutdown" << dendl; + cancel_tick(); + service_shutdown(); + } + + virtual void init() { } + + virtual void get_health(Formatter *f, + list<pair<health_status_t,string> > *detail) = 0; + virtual int get_type() = 0; + virtual string get_name() const = 0; + +}; +typedef boost::intrusive_ptr<QuorumService> QuorumServiceRef; + +#endif /* CEPH_MON_QUORUM_SERVICE_H */ diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 48ca35d5322..1f4135c0867 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -15,6 +15,8 @@ #ifndef CEPH_MON_TYPES_H #define CEPH_MON_TYPES_H +#include "include/utime.h" + #define PAXOS_PGMAP 0 // before osd, for pg kick to behave #define PAXOS_MDSMAP 1 #define PAXOS_OSDMAP 2 @@ -37,4 +39,36 @@ inline const char *get_paxos_name(int p) { #define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012" +// data stats + +struct DataStats { + // data dir + uint64_t kb_total; + uint64_t kb_used; + uint64_t kb_avail; + int latest_avail_percent; + utime_t last_update; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(kb_total, bl); + ::encode(kb_used, bl); + ::encode(kb_avail, bl); + ::encode(latest_avail_percent, bl); + ::encode(last_update, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(kb_total, p); + ::decode(kb_used, p); + ::decode(kb_avail, p); + ::decode(latest_avail_percent, p); + ::decode(last_update, p); + DECODE_FINISH(p); + } +}; + +WRITE_CLASS_ENCODER(DataStats); + #endif diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 72450d9e602..4b3c16b49da 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -84,6 +84,7 @@ using namespace std; #include "messages/MMonGetMap.h" #include "messages/MMonGetVersion.h" #include "messages/MMonGetVersionReply.h" +#include "messages/MMonHealth.h" #include "messages/MAuth.h" #include "messages/MAuthReply.h" @@ -612,6 +613,10 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot m = new MTimeCheck(); break; + case MSG_MON_HEALTH: + m = new MMonHealth(); + break; + // -- simple messages without payload -- case CEPH_MSG_SHUTDOWN: diff --git a/src/msg/Message.h b/src/msg/Message.h index be9bbfdc329..274537db7e6 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -147,6 +147,7 @@ // *** generic *** #define MSG_TIMECHECK 0x600 +#define MSG_MON_HEALTH 0x601 |