summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2013-03-18 15:53:56 -0700
committerSage Weil <sage@newdream.net>2013-03-18 15:53:56 -0700
commit7e7a19ba18cf36a9d1002dad6277fe27512f2ec7 (patch)
treec87d3d8e540587e298de49cdc9e380ddb4e1501f
parenta2ac9358a32f76f9fa4f922c056d7f8dabe52869 (diff)
parentcecfe411bba37983eabee3da06ab930c8b025358 (diff)
downloadceph-7e7a19ba18cf36a9d1002dad6277fe27512f2ec7.tar.gz
Merge pull request #115 from ceph/wip-4199
Resolves #4199 Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/Makefile.am9
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/global/signal_handler.cc6
-rw-r--r--src/global/signal_handler.h3
-rw-r--r--src/messages/MMonHealth.h108
-rw-r--r--src/messages/MMonQuorumService.h72
-rw-r--r--src/mon/DataHealthService.cc211
-rw-r--r--src/mon/DataHealthService.h88
-rw-r--r--src/mon/HealthMonitor.cc95
-rw-r--r--src/mon/HealthMonitor.h87
-rw-r--r--src/mon/HealthService.h52
-rw-r--r--src/mon/Monitor.cc17
-rw-r--r--src/mon/Monitor.h29
-rw-r--r--src/mon/QuorumService.h142
-rw-r--r--src/mon/mon_types.h34
-rw-r--r--src/msg/Message.cc5
-rw-r--r--src/msg/Message.h1
17 files changed, 961 insertions, 1 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index b00fe03b0c5..f8593e5bf3f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1364,7 +1364,9 @@ libmon_a_SOURCES = \
mon/AuthMonitor.cc \
mon/Elector.cc \
mon/MonitorStore.cc \
- os/LevelDBStore.cc
+ os/LevelDBStore.cc \
+ mon/HealthMonitor.cc \
+ mon/DataHealthService.cc
libmon_a_CXXFLAGS= ${AM_CXXFLAGS}
noinst_LIBRARIES += libmon.a
@@ -1826,6 +1828,7 @@ noinst_HEADERS = \
messages/MOSDPGScan.h\
messages/MBackfillReserve.h\
messages/MRecoveryReserve.h\
+ messages/MMonQuorumService.h\
messages/MOSDPGTemp.h\
messages/MOSDPGTrim.h\
messages/MOSDPing.h\
@@ -1847,14 +1850,17 @@ noinst_HEADERS = \
messages/MWatchNotify.h\
messages/PaxosServiceMessage.h\
mon/AuthMonitor.h\
+ mon/DataHealthMonitor.h\
mon/Elector.h\
mon/LogMonitor.h\
+ mon/HealthMonitor.h\
mon/MDSMonitor.h\
mon/MonmapMonitor.h\
mon/MonCaps.h\
mon/MonClient.h\
mon/MonMap.h\
mon/Monitor.h\
+ mon/MonitorHealthService.h\
mon/MonitorStore.h\
mon/MonitorDBStore.h\
mon/OSDMonitor.h\
@@ -1862,6 +1868,7 @@ noinst_HEADERS = \
mon/PGMonitor.h\
mon/Paxos.h\
mon/PaxosService.h\
+ mon/QuorumService.h\
mon/Session.h\
mon/mon_types.h\
mount/canonicalize.c\
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index e99688342b7..9e832be2db8 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -162,6 +162,9 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages
OPTION(mon_client_bytes, OPT_U64, 100ul << 20) // client msg data allowed in memory (in bytes)
OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20) // mds, osd message memory cap (in bytes)
OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
+OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0)
+OPTION(mon_data_avail_crit, OPT_INT, 5)
+OPTION(mon_data_avail_warn, OPT_INT, 30)
OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0)
OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0)
OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0)
diff --git a/src/global/signal_handler.cc b/src/global/signal_handler.cc
index 2a6260da66d..25f1a0a1992 100644
--- a/src/global/signal_handler.cc
+++ b/src/global/signal_handler.cc
@@ -325,6 +325,12 @@ void shutdown_async_signal_handler()
g_signal_handler = NULL;
}
+void queue_async_signal(int signum)
+{
+ assert(g_signal_handler);
+ g_signal_handler->queue_signal(signum);
+}
+
void register_async_signal_handler(int signum, signal_handler_t handler)
{
assert(g_signal_handler);
diff --git a/src/global/signal_handler.h b/src/global/signal_handler.h
index 8acfaed1a4c..3a11f54315e 100644
--- a/src/global/signal_handler.h
+++ b/src/global/signal_handler.h
@@ -35,6 +35,9 @@ void init_async_signal_handler();
/// shutdown async signal handler framework
void shutdown_async_signal_handler();
+/// queue an async signal
+void queue_async_signal(int signum);
+
/// install a safe, async, callback for the given signal
void register_async_signal_handler(int signum, signal_handler_t handler);
void register_async_signal_handler_oneshot(int signum, signal_handler_t handler);
diff --git a/src/messages/MMonHealth.h b/src/messages/MMonHealth.h
new file mode 100644
index 00000000000..2ef3cef519c
--- /dev/null
+++ b/src/messages/MMonHealth.h
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 Inktank, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MMON_HEALTH_H
+#define CEPH_MMON_HEALTH_H
+
+#include "msg/Message.h"
+#include "messages/MMonQuorumService.h"
+#include "mon/mon_types.h"
+
+struct MMonHealth : public MMonQuorumService
+{
+ static const int HEAD_VERSION = 1;
+
+ enum {
+ OP_TELL = 1,
+ };
+
+ static const uint32_t FLAG_DATA = 0x01;
+
+ int service_type;
+ int service_op;
+ uint32_t flags;
+
+ // service specific data
+ DataStats data_stats;
+
+ MMonHealth() : MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION) { }
+ MMonHealth(uint32_t type) :
+ MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION),
+ service_type(type)
+ { }
+ MMonHealth(uint32_t type, int op) :
+ MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION),
+ service_type(type),
+ service_op(op)
+ { }
+
+private:
+ ~MMonHealth() { }
+
+public:
+ const char *get_type_name() const { return "mon_health"; }
+ const char *get_service_op_name() const {
+ switch (service_op) {
+ case OP_TELL: return "tell";
+ }
+ return "???";
+ }
+ void print(ostream &o) const {
+ o << "mon_health( service " << get_service_type()
+ << " op " << get_service_op_name()
+ << " e " << get_epoch() << " r " << get_round()
+ << " flags";
+ if (!flags) {
+ o << " none";
+ } else {
+ if (has_flag(FLAG_DATA)) {
+ o << " data";
+ }
+ }
+ o << " )";
+ }
+
+ int get_service_type() const {
+ return service_type;
+ }
+
+ int get_service_op() {
+ return service_op;
+ }
+
+ void set_flag(uint32_t f) {
+ flags |= f;
+ }
+
+ bool has_flag(uint32_t f) const {
+ return (flags & f);
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ service_decode(p);
+ ::decode(service_type, p);
+ ::decode(service_op, p);
+ ::decode(data_stats, p);
+ }
+
+ void encode_payload(uint64_t features) {
+ service_encode();
+ ::encode(service_type, payload);
+ ::encode(service_op, payload);
+ ::encode(data_stats, payload);
+ }
+
+};
+
+#endif /* CEPH_MMON_HEALTH_H */
diff --git a/src/messages/MMonQuorumService.h b/src/messages/MMonQuorumService.h
new file mode 100644
index 00000000000..6a25ae1e203
--- /dev/null
+++ b/src/messages/MMonQuorumService.h
@@ -0,0 +1,72 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 Inktank, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MMON_QUORUM_SERVICE_H
+#define CEPH_MMON_QUORUM_SERVICE_H
+
+#include "msg/Message.h"
+
+struct MMonQuorumService : public Message
+{
+ epoch_t epoch;
+ version_t round;
+
+ MMonQuorumService(int type, int head=1, int compat=1) :
+ Message(type, head, compat),
+ epoch(0),
+ round(0)
+ { }
+
+protected:
+ ~MMonQuorumService() { }
+
+public:
+
+ void set_epoch(epoch_t e) {
+ epoch = e;
+ }
+
+ void set_round(version_t r) {
+ round = r;
+ }
+
+ epoch_t get_epoch() const {
+ return epoch;
+ }
+
+ version_t get_round() const {
+ return round;
+ }
+
+ void service_encode() {
+ ::encode(epoch, payload);
+ ::encode(round, payload);
+ }
+
+ void service_decode(bufferlist::iterator &p) {
+ ::decode(epoch, p);
+ ::decode(round, p);
+ }
+
+ void encode_payload(uint64_t features) {
+ assert(0 == "MMonQuorumService message must always be a base class");
+ }
+
+ void decode_payload() {
+ assert(0 == "MMonQuorumService message must always be a base class");
+ }
+
+ const char *get_type_name() const { return "quorum_service"; }
+};
+
+#endif /* CEPH_MMON_QUORUM_SERVICE_H */
diff --git a/src/mon/DataHealthService.cc b/src/mon/DataHealthService.cc
new file mode 100644
index 00000000000..d97b458ec03
--- /dev/null
+++ b/src/mon/DataHealthService.cc
@@ -0,0 +1,211 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include <memory>
+#include <tr1/memory>
+#include <errno.h>
+#include <map>
+#include <list>
+#include <string>
+#include <sstream>
+#include <sys/vfs.h>
+
+#include "messages/MMonHealth.h"
+#include "include/types.h"
+#include "include/Context.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/DataHealthService.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, this)
+static ostream& _prefix(std::ostream *_dout, const Monitor *mon,
+ const DataHealthService *svc) {
+ assert(mon != NULL);
+ assert(svc != NULL);
+ return *_dout << "mon." << mon->name << "@" << mon->rank
+ << "(" << mon->get_state_name() << ")." << svc->get_name()
+ << "(" << svc->get_epoch() << ") ";
+}
+
+void DataHealthService::start_epoch()
+{
+ dout(10) << __func__ << " epoch " << get_epoch() << dendl;
+ // we are not bound by election epochs, but we should clear the stats
+ // everytime an election is triggerd. As far as we know, a monitor might
+ // have been running out of disk space and someone fixed it. We don't want
+ // to hold the cluster back, even confusing the user, due to some possibly
+ // outdated stats.
+ stats.clear();
+}
+
+void DataHealthService::get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail)
+{
+ dout(10) << __func__ << dendl;
+ assert(f != NULL);
+
+ f->open_object_section("data_health");
+ f->open_array_section("mons");
+
+ for (map<entity_inst_t,DataStats>::iterator it = stats.begin();
+ it != stats.end(); ++it) {
+ string mon_name = mon->monmap->get_name(it->first.addr);
+ DataStats& stats = it->second;
+
+ health_status_t health_status = HEALTH_OK;
+ string health_detail;
+ if (stats.latest_avail_percent <= g_conf->mon_data_avail_crit) {
+ health_status = HEALTH_ERR;
+ health_detail = "shutdown iminent!";
+ } else if (stats.latest_avail_percent <= g_conf->mon_data_avail_warn) {
+ health_status = HEALTH_WARN;
+ health_detail = "low disk space!";
+ }
+
+ if (detail && health_status != HEALTH_OK) {
+ stringstream ss;
+ ss << "mon." << mon_name << " addr " << it->first.addr
+ << " has " << stats.latest_avail_percent
+ << "\% avail disk space -- " << health_detail;
+ detail->push_back(make_pair(health_status, ss.str()));
+ }
+
+ f->open_object_section(mon_name.c_str());
+ f->dump_string("name", mon_name.c_str());
+ f->dump_int("kb_total", stats.kb_total);
+ f->dump_int("kb_used", stats.kb_used);
+ f->dump_int("kb_avail", stats.kb_avail);
+ f->dump_int("avail_percent", stats.latest_avail_percent);
+ f->dump_stream("last_updated") << stats.last_update;
+ f->dump_stream("health") << health_status;
+ if (health_status != HEALTH_OK)
+ f->dump_string("health_detail", health_detail);
+ f->close_section();
+ }
+
+ f->close_section(); // mons
+ f->close_section(); // data_health
+}
+
+void DataHealthService::update_stats()
+{
+ struct statfs stbuf;
+ int err = ::statfs(g_conf->mon_data.c_str(), &stbuf);
+ if (err < 0) {
+ assert(errno != EIO);
+ mon->clog.error() << __func__ << " statfs error: " << cpp_strerror(errno) << "\n";
+ return;
+ }
+
+ entity_inst_t our_inst = mon->monmap->get_inst(mon->name);
+ DataStats& ours = stats[our_inst];
+
+ ours.kb_total = stbuf.f_blocks * stbuf.f_bsize / 1024;
+ ours.kb_used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize / 1024;
+ ours.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024;
+ ours.latest_avail_percent = (((float)ours.kb_avail/ours.kb_total)*100);
+ dout(0) << __func__ << " avail " << ours.latest_avail_percent << "%"
+ << " total " << ours.kb_total << " used " << ours.kb_used << " avail " << ours.kb_avail
+ << dendl;
+ ours.last_update = ceph_clock_now(g_ceph_context);
+}
+
+void DataHealthService::share_stats()
+{
+ dout(10) << __func__ << dendl;
+ if (!in_quorum())
+ return;
+
+ assert(!stats.empty());
+ entity_inst_t our_inst = mon->monmap->get_inst(mon->rank);
+ assert(stats.count(our_inst) > 0);
+ DataStats &ours = stats[our_inst];
+ const set<int>& quorum = mon->get_quorum();
+ for (set<int>::const_iterator it = quorum.begin();
+ it != quorum.end(); ++it) {
+ if (mon->monmap->get_name(*it) == mon->name)
+ continue;
+ entity_inst_t inst = mon->monmap->get_inst(*it);
+ MMonHealth *m = new MMonHealth(HealthService::SERVICE_HEALTH_DATA,
+ MMonHealth::OP_TELL);
+ m->data_stats = ours;
+ dout(20) << __func__ << " send " << *m << " to " << inst << dendl;
+ mon->messenger->send_message(m, inst);
+ }
+}
+
+void DataHealthService::service_tick()
+{
+ dout(10) << __func__ << dendl;
+
+ update_stats();
+ if (in_quorum())
+ share_stats();
+
+ DataStats &ours = stats[mon->monmap->get_inst(mon->name)];
+
+ if (ours.latest_avail_percent <= g_conf->mon_data_avail_crit) {
+ mon->clog.error()
+ << "reached critical levels of available space on data store"
+ << " -- shutdown!\n";
+ force_shutdown();
+ return;
+ }
+
+ // we must backoff these warnings, and track how much data is being
+ // consumed in-between reports to assess if it's worth to log this info,
+ // otherwise we may very well contribute to the consumption of the
+ // already low available disk space.
+ if (ours.latest_avail_percent <= g_conf->mon_data_avail_warn) {
+ mon->clog.warn()
+ << "reached concerning levels of available space on data store"
+ << " (" << ours.latest_avail_percent << "\% free)\n";
+ }
+}
+
+void DataHealthService::handle_tell(MMonHealth *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ assert(m->get_service_op() == MMonHealth::OP_TELL);
+
+ stats[m->get_source_inst()] = m->data_stats;
+}
+
+bool DataHealthService::service_dispatch(MMonHealth *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ assert(m->get_service_type() == get_type());
+ if (!in_quorum()) {
+ dout(1) << __func__ << " not in quorum -- drop message" << dendl;
+ m->put();
+ return false;
+ }
+
+ switch (m->service_op) {
+ case MMonHealth::OP_TELL:
+ // someone is telling us their stats
+ handle_tell(m);
+ break;
+ default:
+ dout(0) << __func__ << " unknown op " << m->service_op << dendl;
+ assert(0 == "Unknown service op");
+ break;
+ }
+ m->put();
+ return true;
+}
diff --git a/src/mon/DataHealthService.h b/src/mon/DataHealthService.h
new file mode 100644
index 00000000000..d54af792d9c
--- /dev/null
+++ b/src/mon/DataHealthService.h
@@ -0,0 +1,88 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MON_DATA_HEALTH_SERVICE_H
+#define CEPH_MON_DATA_HEALTH_SERVICE_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "mon/mon_types.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+#include "common/Formatter.h"
+#include "common/config.h"
+#include "global/signal_handler.h"
+
+class MMonHealth;
+
+class DataHealthService :
+ public HealthService
+{
+ map<entity_inst_t,DataStats> stats;
+ void handle_tell(MMonHealth *m);
+ void update_stats();
+ void share_stats();
+
+ void force_shutdown() {
+ generic_dout(0) << "** Shutdown via Data Health Service **" << dendl;
+ queue_async_signal(SIGINT);
+ }
+
+protected:
+ virtual void service_tick();
+ virtual bool service_dispatch(Message *m) {
+ assert(0 == "We should never reach this; only the function below");
+ return false;
+ }
+ virtual bool service_dispatch(MMonHealth *m);
+ virtual void service_shutdown() { }
+
+ virtual void start_epoch();
+ virtual void finish_epoch() { }
+ virtual void cleanup() { }
+
+public:
+ DataHealthService(Monitor *m) :
+ HealthService(m)
+ {
+ set_update_period(g_conf->mon_health_data_update_interval);
+ }
+ virtual ~DataHealthService() { }
+ DataHealthService *get() {
+ return static_cast<DataHealthService *>(RefCountedObject::get());
+ }
+
+ virtual void init() {
+ generic_dout(20) << "data_health " << __func__ << dendl;
+ start_tick();
+ }
+
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail);
+
+ virtual int get_type() {
+ return HealthService::SERVICE_HEALTH_DATA;
+ }
+
+ virtual string get_name() const {
+ return "data_health";
+ }
+};
+typedef boost::intrusive_ptr<DataHealthService> DataHealthServiceRef;
+
+#endif /* CEPH_MON_DATA_HEALTH_SERVICE_H */
diff --git a/src/mon/HealthMonitor.cc b/src/mon/HealthMonitor.cc
new file mode 100644
index 00000000000..cf3d962b2c4
--- /dev/null
+++ b/src/mon/HealthMonitor.cc
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sstream>
+#include <stdlib.h>
+#include <limits.h>
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+#include "mon/HealthMonitor.h"
+#include "mon/DataHealthService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, this)
+static ostream& _prefix(std::ostream *_dout, const Monitor *mon,
+ const HealthMonitor *hmon) {
+ return *_dout << "mon." << mon->name << "@" << mon->rank
+ << "(" << mon->get_state_name() << ")." << hmon->get_name()
+ << "(" << hmon->get_epoch() << ") ";
+}
+
+void HealthMonitor::init()
+{
+ dout(10) << __func__ << dendl;
+ assert(services.empty());
+ services[HealthService::SERVICE_HEALTH_DATA] =
+ HealthServiceRef(new DataHealthService(mon));
+
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->init();
+ }
+}
+
+bool HealthMonitor::service_dispatch(Message *m)
+{
+ assert(m->get_type() == MSG_MON_HEALTH);
+ MMonHealth *hm = (MMonHealth*)m;
+ int service_type = hm->get_service_type();
+ if (services.count(service_type) == 0) {
+ dout(1) << __func__ << " service type " << service_type
+ << " not registered -- drop message!" << dendl;
+ m->put();
+ return false;
+ }
+ return services[service_type]->service_dispatch(hm);
+}
+
+void HealthMonitor::service_shutdown()
+{
+ dout(0) << "HealthMonitor::service_shutdown "
+ << services.size() << " services" << dendl;
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->shutdown();
+ }
+ services.clear();
+}
+
+void HealthMonitor::get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail) {
+ assert(f != NULL);
+ f->open_object_section("health");
+ f->open_array_section("health_services");
+
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->get_health(f, detail);
+ }
+
+ f->close_section(); // health_services
+ f->close_section(); // health
+}
+
diff --git a/src/mon/HealthMonitor.h b/src/mon/HealthMonitor.h
new file mode 100644
index 00000000000..85581edde9b
--- /dev/null
+++ b/src/mon/HealthMonitor.h
@@ -0,0 +1,87 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_HEALTH_MONITOR_H
+#define CEPH_HEALTH_MONITOR_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+#include "common/Formatter.h"
+
+class HealthMonitor : public QuorumService
+{
+ map<int,HealthServiceRef> services;
+
+protected:
+ virtual void service_shutdown();
+
+public:
+ HealthMonitor(Monitor *m) : QuorumService(m) { }
+ virtual ~HealthMonitor() { }
+ HealthMonitor *get() {
+ return static_cast<HealthMonitor *>(RefCountedObject::get());
+ }
+
+
+ /**
+ * @defgroup HealthMonitor_Inherited_h Inherited abstract methods
+ * @{
+ */
+ virtual void init();
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail);
+ virtual bool service_dispatch(Message *m);
+
+ virtual void start_epoch() {
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->start(get_epoch());
+ }
+ }
+
+ virtual void finish_epoch() {
+ generic_dout(20) << "HealthMonitor::finish_epoch()" << dendl;
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ assert(it->second.get() != NULL);
+ it->second->finish();
+ }
+ }
+
+ virtual void cleanup() { }
+ virtual void service_tick() { }
+
+ virtual int get_type() {
+ return QuorumService::SERVICE_HEALTH;
+ }
+
+ virtual string get_name() const {
+ return "health";
+ }
+
+ /**
+ * @} // HealthMonitor_Inherited_h
+ */
+};
+typedef boost::intrusive_ptr<HealthMonitor> HealthMonitorRef;
+
+#endif // CEPH_HEALTH_MONITOR_H
diff --git a/src/mon/HealthService.h b/src/mon/HealthService.h
new file mode 100644
index 00000000000..a4de07a1a63
--- /dev/null
+++ b/src/mon/HealthService.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MON_HEALTH_SERVICE_H
+#define CEPH_MON_HEALTH_SERVICE_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+
+struct HealthService : public QuorumService
+{
+ static const int SERVICE_HEALTH_DATA = 0x01;
+
+ HealthService(Monitor *m) : QuorumService(m) { }
+ virtual ~HealthService() { }
+
+ virtual bool service_dispatch(Message *m) {
+ return service_dispatch(static_cast<MMonHealth*>(m));
+ }
+
+ virtual bool service_dispatch(MMonHealth *m) = 0;
+
+public:
+ HealthService *get() {
+ return static_cast<HealthService *>(RefCountedObject::get());
+ }
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail) = 0;
+ virtual int get_type() = 0;
+ virtual string get_name() const = 0;
+};
+typedef boost::intrusive_ptr<HealthService> HealthServiceRef;
+
+#endif // CEPH_MON_HEALTH_SERVICE_H
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 769381ba499..9f35e0df615 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -49,6 +49,7 @@
#include "messages/MAuthReply.h"
#include "messages/MTimeCheck.h"
+#include "messages/MMonHealth.h"
#include "common/strtol.h"
#include "common/ceph_argparse.h"
@@ -68,6 +69,8 @@
#include "PGMonitor.h"
#include "LogMonitor.h"
#include "AuthMonitor.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthMonitor.h"
#include "auth/AuthMethodList.h"
#include "auth/KeyRing.h"
@@ -163,6 +166,8 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
+ health_monitor = QuorumServiceRef(new HealthMonitor(this));
+
mon_caps = new MonCaps();
mon_caps->set_allow_all(true);
mon_caps->text = "allow *";
@@ -418,6 +423,7 @@ int Monitor::preinit()
}
init_paxos();
+ health_monitor->init();
// we need to bootstrap authentication keys so we can form an
// initial quorum.
@@ -565,6 +571,7 @@ void Monitor::shutdown()
// clean up
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
(*p)->shutdown();
+ health_monitor->shutdown();
finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
@@ -683,6 +690,7 @@ void Monitor::reset()
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
(*p)->restart();
+ health_monitor->finish();
}
set<string> Monitor::get_sync_targets_names() {
@@ -1964,6 +1972,7 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features)
paxos->leader_init();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
(*p)->election_finished();
+ health_monitor->start(epoch);
finish_election();
if (monmap->size() > 1)
@@ -1997,6 +2006,7 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features
paxos->peon_init();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
(*p)->election_finished();
+ health_monitor->start(epoch);
finish_election();
}
@@ -2283,6 +2293,9 @@ void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f)
if (f)
f->close_section();
+ if (f)
+ health_monitor->get_health(f, (detailbl ? &detail : NULL));
+
stringstream fss;
fss << overall;
status = fss.str() + ss.str();
@@ -3174,6 +3187,10 @@ bool Monitor::_ms_dispatch(Message *m)
handle_timecheck(static_cast<MTimeCheck *>(m));
break;
+ case MSG_MON_HEALTH:
+ health_monitor->dispatch(static_cast<MMonHealth *>(m));
+ break;
+
default:
ret = false;
}
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index 9d2bd598823..55f27c7822d 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -51,6 +51,9 @@
#include <memory>
#include <tr1/memory>
#include <errno.h>
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
#define CEPH_MON_PROTOCOL 10 /* cluster internal */
@@ -83,6 +86,7 @@ enum {
l_cluster_last,
};
+class QuorumService;
class PaxosService;
class PerfCounters;
@@ -97,6 +101,7 @@ class MAuthRotating;
class MRoute;
class MForward;
class MTimeCheck;
+class MMonHealth;
#define COMPAT_SET_LOC "feature_set"
@@ -1135,7 +1140,30 @@ private:
/**
* @}
*/
+ /**
+ * @defgroup Monitor_h_stats Keep track of monitor statistics
+ * @{
+ */
+ struct MonStatsEntry {
+ // data dir
+ uint64_t kb_total;
+ uint64_t kb_used;
+ uint64_t kb_avail;
+ unsigned int latest_avail_ratio;
+ utime_t last_update;
+ };
+
+ struct MonStats {
+ MonStatsEntry ours;
+ map<entity_inst_t,MonStatsEntry> others;
+ };
+
+ MonStats stats;
+ void stats_update();
+ /**
+ * @}
+ */
Context *probe_timeout_event; // for probing
@@ -1215,6 +1243,7 @@ public:
friend class PGMonitor;
friend class LogMonitor;
+ boost::intrusive_ptr<QuorumService> health_monitor;
// -- sessions --
MonSessionMap session_map;
diff --git a/src/mon/QuorumService.h b/src/mon/QuorumService.h
new file mode 100644
index 00000000000..054fce67036
--- /dev/null
+++ b/src/mon/QuorumService.h
@@ -0,0 +1,142 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MON_QUORUM_SERVICE_H
+#define CEPH_MON_QUORUM_SERVICE_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "common/RefCountedObj.h"
+#include "common/config.h"
+
+#include "mon/Monitor.h"
+#include "messages/MMonQuorumService.h"
+
+class QuorumService : public RefCountedObject
+{
+ uint32_t flags;
+ Context *tick_event;
+ double tick_period;
+
+ struct C_Tick : public Context {
+ boost::intrusive_ptr<QuorumService> s;
+ C_Tick(boost::intrusive_ptr<QuorumService> qs) : s(qs) { }
+ void finish(int r) {
+ if (r < 0)
+ return;
+ s->tick();
+ }
+ };
+
+public:
+ static const int SERVICE_HEALTH = 0x01;
+ static const int SERVICE_TIMECHECK = 0x02;
+
+protected:
+ Monitor *mon;
+ epoch_t epoch;
+
+ QuorumService(Monitor *m) :
+ tick_event(NULL),
+ tick_period(g_conf->mon_tick_interval),
+ mon(m),
+ epoch(0)
+ {
+ }
+
+ void cancel_tick() {
+ if (tick_event)
+ mon->timer.cancel_event(tick_event);
+ tick_event = NULL;
+ }
+
+ void start_tick() {
+ generic_dout(10) << __func__ << dendl;
+
+ cancel_tick();
+ if (tick_period <= 0)
+ return;
+
+ tick_event = new C_Tick(
+ boost::intrusive_ptr<QuorumService>(this));
+ mon->timer.add_event_after(tick_period, tick_event);
+ }
+
+ void set_update_period(double t) {
+ tick_period = t;
+ }
+
+ bool in_quorum() {
+ return (mon->is_leader() || mon->is_peon());
+ }
+
+ virtual bool service_dispatch(Message *m) = 0;
+ virtual void service_tick() = 0;
+ virtual void service_shutdown() = 0;
+
+ virtual void start_epoch() = 0;
+ virtual void finish_epoch() = 0;
+ virtual void cleanup() = 0;
+
+public:
+ virtual ~QuorumService() { }
+ QuorumService *get() {
+ return static_cast<QuorumService *>(RefCountedObject::get());
+ }
+
+ void start(epoch_t new_epoch) {
+ epoch = new_epoch;
+ start_epoch();
+ }
+
+ void finish() {
+ generic_dout(20) << "QuorumService::finish" << dendl;
+ finish_epoch();
+ }
+
+ epoch_t get_epoch() const {
+ return epoch;
+ }
+
+ bool dispatch(MMonQuorumService *m) {
+ return service_dispatch(m);
+ }
+
+ void tick() {
+ service_tick();
+ start_tick();
+ }
+
+ void shutdown() {
+ generic_dout(0) << "quorum service shutdown" << dendl;
+ cancel_tick();
+ service_shutdown();
+ }
+
+ virtual void init() { }
+
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail) = 0;
+ virtual int get_type() = 0;
+ virtual string get_name() const = 0;
+
+};
+typedef boost::intrusive_ptr<QuorumService> QuorumServiceRef;
+
+#endif /* CEPH_MON_QUORUM_SERVICE_H */
diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h
index 48ca35d5322..1f4135c0867 100644
--- a/src/mon/mon_types.h
+++ b/src/mon/mon_types.h
@@ -15,6 +15,8 @@
#ifndef CEPH_MON_TYPES_H
#define CEPH_MON_TYPES_H
+#include "include/utime.h"
+
#define PAXOS_PGMAP 0 // before osd, for pg kick to behave
#define PAXOS_MDSMAP 1
#define PAXOS_OSDMAP 2
@@ -37,4 +39,36 @@ inline const char *get_paxos_name(int p) {
#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012"
+// data stats
+
+struct DataStats {
+ // data dir
+ uint64_t kb_total;
+ uint64_t kb_used;
+ uint64_t kb_avail;
+ int latest_avail_percent;
+ utime_t last_update;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(kb_total, bl);
+ ::encode(kb_used, bl);
+ ::encode(kb_avail, bl);
+ ::encode(latest_avail_percent, bl);
+ ::encode(last_update, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &p) {
+ DECODE_START(1, p);
+ ::decode(kb_total, p);
+ ::decode(kb_used, p);
+ ::decode(kb_avail, p);
+ ::decode(latest_avail_percent, p);
+ ::decode(last_update, p);
+ DECODE_FINISH(p);
+ }
+};
+
+WRITE_CLASS_ENCODER(DataStats);
+
#endif
diff --git a/src/msg/Message.cc b/src/msg/Message.cc
index 72450d9e602..4b3c16b49da 100644
--- a/src/msg/Message.cc
+++ b/src/msg/Message.cc
@@ -84,6 +84,7 @@ using namespace std;
#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersion.h"
#include "messages/MMonGetVersionReply.h"
+#include "messages/MMonHealth.h"
#include "messages/MAuth.h"
#include "messages/MAuthReply.h"
@@ -612,6 +613,10 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
m = new MTimeCheck();
break;
+ case MSG_MON_HEALTH:
+ m = new MMonHealth();
+ break;
+
// -- simple messages without payload --
case CEPH_MSG_SHUTDOWN:
diff --git a/src/msg/Message.h b/src/msg/Message.h
index be9bbfdc329..274537db7e6 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -147,6 +147,7 @@
// *** generic ***
#define MSG_TIMECHECK 0x600
+#define MSG_MON_HEALTH 0x601