summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-06-19 11:33:28 -0700
committerSage Weil <sage@inktank.com>2013-06-19 11:33:28 -0700
commit02b3c552659299c7c7c2e789f0660f0add2f639f (patch)
tree13179181b2dc50b8722864e97ba8ce9999b26fe2
parentfd83bc3f5ead56c77bec9d0f51ed942f6b9b752e (diff)
parent392a8e21f8571b410c85be2129ef62dd6fc52b54 (diff)
downloadceph-02b3c552659299c7c7c2e789f0660f0add2f639f.tar.gz
Merge pull request #342 from ceph/wip-mon
Reviewed-by: Joao Eduardo Luis <joao.luis@inktank.com>
-rw-r--r--src/mon/AuthMonitor.cc3
-rw-r--r--src/mon/AuthMonitor.h2
-rw-r--r--src/mon/LogMonitor.cc17
-rw-r--r--src/mon/LogMonitor.h2
-rw-r--r--src/mon/MDSMonitor.cc3
-rw-r--r--src/mon/MDSMonitor.h2
-rw-r--r--src/mon/Monitor.cc25
-rw-r--r--src/mon/Monitor.h1
-rw-r--r--src/mon/MonmapMonitor.cc51
-rw-r--r--src/mon/MonmapMonitor.h2
-rw-r--r--src/mon/OSDMonitor.cc3
-rw-r--r--src/mon/OSDMonitor.h2
-rw-r--r--src/mon/PGMonitor.cc3
-rw-r--r--src/mon/PGMonitor.h2
-rw-r--r--src/mon/Paxos.cc79
-rw-r--r--src/mon/Paxos.h76
-rw-r--r--src/mon/PaxosService.cc35
-rw-r--r--src/mon/PaxosService.h91
18 files changed, 185 insertions, 214 deletions
diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc
index 6af1305ada1..5fa8644f393 100644
--- a/src/mon/AuthMonitor.cc
+++ b/src/mon/AuthMonitor.cc
@@ -69,7 +69,6 @@ void AuthMonitor::tick()
{
if (!is_active()) return;
- update_from_paxos();
dout(10) << *this << dendl;
if (!mon->is_leader()) return;
@@ -112,7 +111,7 @@ void AuthMonitor::create_initial()
pending_auth.push_back(inc);
}
-void AuthMonitor::update_from_paxos()
+void AuthMonitor::update_from_paxos(bool *need_bootstrap)
{
dout(10) << __func__ << dendl;
version_t version = get_version();
diff --git a/src/mon/AuthMonitor.h b/src/mon/AuthMonitor.h
index 9368fcd8613..5c17105692b 100644
--- a/src/mon/AuthMonitor.h
+++ b/src/mon/AuthMonitor.h
@@ -130,7 +130,7 @@ private:
void on_active();
bool should_propose(double& delay);
void create_initial();
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
bool prepare_global_id(MMonGlobalID *m);
void increase_max_global_id();
diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc
index 0a62be8bbba..91ca497a744 100644
--- a/src/mon/LogMonitor.cc
+++ b/src/mon/LogMonitor.cc
@@ -70,7 +70,6 @@ void LogMonitor::tick()
{
if (!is_active()) return;
- update_from_paxos();
dout(10) << *this << dendl;
if (!mon->is_leader()) return;
@@ -91,7 +90,7 @@ void LogMonitor::create_initial()
pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
}
-void LogMonitor::update_from_paxos()
+void LogMonitor::update_from_paxos(bool *need_bootstrap)
{
dout(10) << __func__ << dendl;
version_t version = get_version();
@@ -106,13 +105,13 @@ void LogMonitor::update_from_paxos()
version_t latest_full = get_version_latest_full();
dout(10) << __func__ << " latest full " << latest_full << dendl;
if ((latest_full > 0) && (latest_full > summary.version)) {
- bufferlist latest_bl;
- get_version_full(latest_full, latest_bl);
- assert(latest_bl.length() != 0);
- dout(7) << __func__ << " loading summary e" << latest_full << dendl;
- bufferlist::iterator p = latest_bl.begin();
- ::decode(summary, p);
- dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
+ bufferlist latest_bl;
+ get_version_full(latest_full, latest_bl);
+ assert(latest_bl.length() != 0);
+ dout(7) << __func__ << " loading summary e" << latest_full << dendl;
+ bufferlist::iterator p = latest_bl.begin();
+ ::decode(summary, p);
+ dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
}
// walk through incrementals
diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h
index 9eee758d47f..e20c81e227b 100644
--- a/src/mon/LogMonitor.h
+++ b/src/mon/LogMonitor.h
@@ -34,7 +34,7 @@ private:
LogSummary pending_summary, summary;
void create_initial();
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
// propose pending update to peers
void encode_pending(MonitorDBStore::Transaction *t);
diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc
index 0314f2b2e4d..5f4582fdce3 100644
--- a/src/mon/MDSMonitor.cc
+++ b/src/mon/MDSMonitor.cc
@@ -84,7 +84,7 @@ void MDSMonitor::create_initial()
}
-void MDSMonitor::update_from_paxos()
+void MDSMonitor::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_version();
if (version == mdsmap.epoch)
@@ -1040,7 +1040,6 @@ void MDSMonitor::tick()
// ...if i am an active leader
if (!is_active()) return;
- update_from_paxos();
dout(10) << mdsmap << dendl;
bool do_propose = false;
diff --git a/src/mon/MDSMonitor.h b/src/mon/MDSMonitor.h
index 52841cfff10..b6ca84022af 100644
--- a/src/mon/MDSMonitor.h
+++ b/src/mon/MDSMonitor.h
@@ -68,7 +68,7 @@ class MDSMonitor : public PaxosService {
// service methods
void create_initial();
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void create_pending();
void encode_pending(MonitorDBStore::Transaction *t);
// we don't require full versions; don't encode any.
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index ae23b4baf3b..b74e821b780 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -521,17 +521,21 @@ void Monitor::init_paxos()
paxos->init();
// update paxos
- for (int i = 0; i < PAXOS_NUM; ++i) {
- if (paxos->is_consistent()) {
- paxos_service[i]->update_from_paxos();
+ if (paxos->is_consistent()) {
+ refresh_from_paxos(NULL);
+
+ // init services
+ for (int i = 0; i < PAXOS_NUM; ++i) {
+ paxos_service[i]->init();
}
}
+}
- // init services
+void Monitor::refresh_from_paxos(bool *need_bootstrap)
+{
+ dout(10) << __func__ << dendl;
for (int i = 0; i < PAXOS_NUM; ++i) {
- if (paxos->is_consistent()) {
- paxos_service[i]->init();
- }
+ paxos_service[i]->refresh(need_bootstrap);
}
}
@@ -3407,13 +3411,6 @@ bool Monitor::_ms_dispatch(Message *m)
}
paxos->dispatch((PaxosServiceMessage*)m);
-
- // make sure service finds out about any state changes
- if (paxos->is_active()) {
- vector<PaxosService*>::iterator service_it = paxos_service.begin();
- for ( ; service_it != paxos_service.end(); ++service_it)
- (*service_it)->update_from_paxos();
- }
}
break;
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index 1ce191584a4..a9420ddefca 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -1414,6 +1414,7 @@ public:
int preinit();
int init();
void init_paxos();
+ void refresh_from_paxos(bool *need_bootstrap);
void shutdown();
void tick();
diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc
index cbcefae104f..7722518b465 100644
--- a/src/mon/MonmapMonitor.cc
+++ b/src/mon/MonmapMonitor.cc
@@ -47,7 +47,7 @@ void MonmapMonitor::create_initial()
pending_map.epoch = 1;
}
-void MonmapMonitor::update_from_paxos()
+void MonmapMonitor::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_version();
if (version <= mon->monmap->get_epoch())
@@ -56,46 +56,10 @@ void MonmapMonitor::update_from_paxos()
dout(10) << __func__ << " version " << version
<< ", my v " << mon->monmap->epoch << dendl;
- /* It becomes clear here that we used the stashed version as a consistency
- * mechanism. Take the 'if' we use: if our latest committed version is
- * greater than 0 (i.e., exists one), and this version is different from
- * our stashed version, then we will take the stashed monmap as our owm.
- *
- * This is cleary to address the case in which we have a failure during
- * the old MonitorStore updates. If a stashed version exists and it has
- * a grater value than the last committed version, it means something
- * went awry, and we did stashed a version (either after updating paxos
- * and before proposing a new value, or during paxos itself) but it
- * never became the last committed (for instance, because the system failed
- * in the mean time).
- *
- * We no longer need to address these concerns. We are using transactions
- * now and it should be the Paxos applying them. If the Paxos applies a
- * transaction with the value we proposed, then it will be consistent
- * with the Paxos values themselves. No need to hack our way in the
- * store and create stashed versions to handle inconsistencies that are
- * addressed by our MonitorDBStore.
- *
- * NOTE: this is not entirely true for the remaining services. In this one,
- * the MonmapMonitor, we don't keep incrementals and each version is a full
- * monmap. In the remaining services however, we keep mostly incrementals and
- * we used to stash full versions of each map/summary. We still do it. We
- * just don't need to do it here. Just check the code below and compare it
- * with the code further down the line where we 'get' the latest committed
- * version: it's the same code.
- *
- version_t latest_full = get_version_latest_full();
- if ((latest_full > 0) && (latest_full > mon->monmap->get_epoch())) {
- bufferlist latest_bl;
- int err = get_version_full(latest_full, latest_bl);
- assert(err == 0);
- dout(7) << __func__ << " loading latest full monmap v"
- << latest_full << dendl;
- if (latest_bl.length() > 0)
- mon->monmap->decode(latest_bl);
+ if (need_bootstrap && version != mon->monmap->get_epoch()) {
+ dout(10) << " signaling that we need a bootstrap" << dendl;
+ *need_bootstrap = true;
}
- */
- bool need_restart = version != mon->monmap->get_epoch();
// read and decode
monmap_bl.clear();
@@ -106,15 +70,11 @@ void MonmapMonitor::update_from_paxos()
dout(10) << "update_from_paxos got " << version << dendl;
mon->monmap->decode(monmap_bl);
- if (exists_key("mfks", get_service_name())) {
+ if (exists_key("mkfs", get_service_name())) {
MonitorDBStore::Transaction t;
erase_mkfs(&t);
mon->store->apply_transaction(t);
}
-
- if (need_restart) {
- mon->bootstrap();
- }
}
void MonmapMonitor::create_pending()
@@ -434,7 +394,6 @@ bool MonmapMonitor::should_propose(double& delay)
void MonmapMonitor::tick()
{
- update_from_paxos();
}
void MonmapMonitor::get_health(list<pair<health_status_t, string> >& summary,
diff --git a/src/mon/MonmapMonitor.h b/src/mon/MonmapMonitor.h
index 0690fb7409f..198489d7017 100644
--- a/src/mon/MonmapMonitor.h
+++ b/src/mon/MonmapMonitor.h
@@ -46,7 +46,7 @@ class MonmapMonitor : public PaxosService {
void create_initial();
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void create_pending();
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index cf0e45d9a65..f1728c0240d 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -105,7 +105,7 @@ void OSDMonitor::create_initial()
newmap.encode(pending_inc.fullmap);
}
-void OSDMonitor::update_from_paxos()
+void OSDMonitor::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_version();
if (version == osdmap.epoch)
@@ -1650,7 +1650,6 @@ void OSDMonitor::tick()
{
if (!is_active()) return;
- update_from_paxos();
dout(10) << osdmap << dendl;
if (!mon->is_leader()) return;
diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h
index ef5ba77462b..ab0962c0a6b 100644
--- a/src/mon/OSDMonitor.h
+++ b/src/mon/OSDMonitor.h
@@ -145,7 +145,7 @@ private:
public:
void create_initial();
private:
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
void encode_pending(MonitorDBStore::Transaction *t);
virtual void encode_full(MonitorDBStore::Transaction *t);
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index 1178fcc381b..a9f3c1b6c5d 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -120,7 +120,6 @@ void PGMonitor::tick()
{
if (!is_active()) return;
- update_from_paxos();
handle_osd_timeouts();
if (mon->is_leader()) {
@@ -150,7 +149,7 @@ void PGMonitor::create_initial()
dout(10) << "create_initial -- creating initial map" << dendl;
}
-void PGMonitor::update_from_paxos()
+void PGMonitor::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_version();
if (version == pg_map.version)
diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h
index e3afd0df247..b18b76b1288 100644
--- a/src/mon/PGMonitor.h
+++ b/src/mon/PGMonitor.h
@@ -51,7 +51,7 @@ private:
PGMap::Incremental pending_inc;
void create_initial();
- void update_from_paxos();
+ void update_from_paxos(bool *need_bootstrap);
void init();
void handle_osd_timeouts();
void create_pending(); // prepare a new pending
diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc
index 70f06870ec2..de6641663ca 100644
--- a/src/mon/Paxos.cc
+++ b/src/mon/Paxos.cc
@@ -431,11 +431,10 @@ void Paxos::handle_last(MMonPaxos *last)
if (uncommitted_v == last_committed+1 &&
uncommitted_value.length()) {
dout(10) << "that's everyone. begin on old learned value" << dendl;
- state = STATE_PREPARING | STATE_LOCKED;
+ state = STATE_UPDATING_PREVIOUS;
begin(uncommitted_value);
} else {
// active!
- state = STATE_ACTIVE;
dout(10) << "that's everyone. active!" << dendl;
extend_lease();
@@ -471,9 +470,7 @@ void Paxos::begin(bufferlist& v)
<< dendl;
assert(mon->is_leader());
- assert(is_preparing());
- state &= ~STATE_PREPARING;
- state |= STATE_UPDATING;
+ assert(is_updating() || is_updating_previous());
// we must already have a majority for this to work.
assert(mon->get_quorum().size() == 1 ||
@@ -508,13 +505,11 @@ void Paxos::begin(bufferlist& v)
if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
commit();
- state = STATE_ACTIVE;
finish_proposal();
finish_contexts(g_ceph_context, waiting_for_active);
finish_contexts(g_ceph_context, waiting_for_commit);
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
-
return;
}
@@ -604,7 +599,7 @@ void Paxos::handle_accept(MMonPaxos *accept)
assert(accept->last_committed == last_committed || // not committed
accept->last_committed == last_committed-1); // committed
- assert(is_updating());
+ assert(is_updating() || is_updating_previous());
assert(accepted.count(from) == 0);
accepted.insert(from);
dout(10) << " now " << accepted << " have accepted" << dendl;
@@ -625,9 +620,8 @@ void Paxos::handle_accept(MMonPaxos *accept)
accept_timeout_event = 0;
// yay!
- state = STATE_ACTIVE;
extend_lease();
-
+
finish_proposal();
// wake people up
@@ -644,7 +638,7 @@ void Paxos::accept_timeout()
dout(5) << "accept timeout, calling fresh election" << dendl;
accept_timeout_event = 0;
assert(mon->is_leader());
- assert(is_updating());
+ assert(is_updating() || is_updating_previous());
mon->bootstrap();
}
@@ -713,16 +707,18 @@ void Paxos::handle_commit(MMonPaxos *commit)
}
store_state(commit);
-
+
commit->put();
+ mon->refresh_from_paxos(NULL);
+
finish_contexts(g_ceph_context, waiting_for_commit);
}
void Paxos::extend_lease()
{
assert(mon->is_leader());
- assert(is_active());
+ //assert(is_active());
lease_expire = ceph_clock_now(g_ceph_context);
lease_expire += g_conf->mon_lease;
@@ -780,40 +776,34 @@ void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
}
-void Paxos::finish_queued_proposal()
+void Paxos::finish_proposal()
{
assert(mon->is_leader());
- assert(!proposals.empty());
- dout(10) << __func__ << " finishing proposal" << dendl;
- C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
- dout(10) << __func__ << " finish it (proposal = "
- << proposal << ")" << dendl;;
+ // make sure we have the latest state loaded up
+ bool need_bootstrap = false;
+ mon->refresh_from_paxos(&need_bootstrap);
- assert(proposal != NULL);
+ // ok, now go active!
+ state = STATE_ACTIVE;
- if (!proposal->proposed) {
- dout(10) << __func__ << " we must have received a stay message and we're "
- << "trying to finish before time. "
- << "Instead, propose it (if we are active)!" << dendl;
- } else {
- dout(10) << __func__ << " proposal took "
- << (ceph_clock_now(NULL) - proposal->proposal_time)
- << " to finish" << dendl;
+ // finish off the last proposal
+ if (!proposals.empty()) {
+ assert(mon->is_leader());
- proposals.pop_front();
- proposal->complete(0);
+ C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
+ if (!proposal->proposed) {
+ dout(10) << __func__ << " proposal " << proposal << ": we must have received a stay message and we're "
+ << "trying to finish before time. "
+ << "Instead, propose it (if we are active)!" << dendl;
+ } else {
+ dout(10) << __func__ << " proposal " << proposal << " took "
+ << (ceph_clock_now(NULL) - proposal->proposal_time)
+ << " to finish" << dendl;
+ proposals.pop_front();
+ proposal->complete(0);
+ }
}
-}
-
-void Paxos::finish_proposal()
-{
- /* There is a lot of debug still going around. We will get rid of it later
- * on, as soon as everything "just works (tm)"
- */
- assert(mon->is_leader());
- if (!proposals.empty())
- finish_queued_proposal();
dout(10) << __func__ << " state " << state
<< " proposals left " << proposals.size() << dendl;
@@ -830,6 +820,12 @@ void Paxos::finish_proposal()
first_committed = get_store()->get(get_name(), "first_committed");
last_committed = get_store()->get(get_name(), "last_committed");
+ if (need_bootstrap) {
+ dout(10) << " doing requested bootstrap" << dendl;
+ mon->bootstrap();
+ return;
+ }
+
if (should_trim()) {
trim();
}
@@ -1254,8 +1250,6 @@ void Paxos::propose_queued()
assert(is_active());
assert(!proposals.empty());
- state = STATE_PREPARING;
-
C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
assert(!proposal->proposed);
@@ -1268,6 +1262,7 @@ void Paxos::propose_queued()
list_proposals(*_dout);
*_dout << dendl;
+ state = STATE_UPDATING;
begin(proposal->bl);
}
diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h
index 04553776b93..4f1af82836e 100644
--- a/src/mon/Paxos.h
+++ b/src/mon/Paxos.h
@@ -163,24 +163,24 @@ public:
* @defgroup Paxos_h_states States on which the leader/peon may be.
* @{
*/
- /**
- * Leader/Peon is in Paxos' Recovery state
- */
- const static int STATE_RECOVERING = 0x01;
- /**
- * Leader/Peon is idle, and the Peon may or may not have a valid lease.
- */
- const static int STATE_ACTIVE = 0x02;
- /**
- * Leader/Peon is updating to a new value.
- */
- const static int STATE_UPDATING = 0x04;
- /**
- * Leader is about to propose a new value, but hasn't gotten to do it yet.
- */
- const static int STATE_PREPARING = 0x08;
-
- const static int STATE_LOCKED = 0x10;
+ enum {
+ /**
+ * Leader/Peon is in Paxos' Recovery state
+ */
+ STATE_RECOVERING,
+ /**
+ * Leader/Peon is idle, and the Peon may or may not have a valid lease.
+ */
+ STATE_ACTIVE,
+ /**
+ * Leader/Peon is updating to a new value.
+ */
+ STATE_UPDATING,
+ /*
+ * Leader proposing an old value
+ */
+ STATE_UPDATING_PREVIOUS,
+ };
/**
* Obtain state name from constant value.
@@ -192,26 +192,18 @@ public:
* @return The state's name.
*/
static const string get_statename(int s) {
- stringstream ss;
- if (s & STATE_RECOVERING) {
- ss << "recovering";
- assert(!(s & ~(STATE_RECOVERING|STATE_LOCKED)));
- } else if (s & STATE_ACTIVE) {
- ss << "active";
- assert(s == STATE_ACTIVE);
- } else if (s & STATE_UPDATING) {
- ss << "updating";
- assert(!(s & ~(STATE_UPDATING|STATE_LOCKED)));
- } else if (s & STATE_PREPARING) {
- ss << "preparing update";
- assert(!(s & ~(STATE_PREPARING|STATE_LOCKED)));
- } else {
- assert(0 == "We shouldn't have gotten here!");
+ switch (s) {
+ case STATE_RECOVERING:
+ return "recovering";
+ case STATE_ACTIVE:
+ return "active";
+ case STATE_UPDATING:
+ return "updating";
+ case STATE_UPDATING_PREVIOUS:
+ return "updating-previous";
+ default:
+ return "UNKNOWN";
}
-
- if (s & STATE_LOCKED)
- ss << " (locked)";
- return ss.str();
}
private:
@@ -241,10 +233,13 @@ public:
*
* @return 'true' if we are on the Updating state; 'false' otherwise.
*/
- bool is_updating() const { return (state & STATE_UPDATING); }
+ bool is_updating() const { return state == STATE_UPDATING; }
- bool is_preparing() const { return (state & STATE_PREPARING); }
- bool is_locked() const { return (state & STATE_LOCKED); }
+ /**
+ * Check if we are updating/proposing a previous value from a
+ * previous quorum
+ */
+ bool is_updating_previous() const { return state == STATE_UPDATING_PREVIOUS; }
private:
/**
@@ -994,7 +989,6 @@ private:
* Begin proposing the Proposal at the front of the proposals queue.
*/
void propose_queued();
- void finish_queued_proposal();
void finish_proposal();
public:
diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc
index 79ea0d41281..fb704ca7a97 100644
--- a/src/mon/PaxosService.cc
+++ b/src/mon/PaxosService.cc
@@ -24,11 +24,12 @@
#define dout_subsys ceph_subsys_paxos
#undef dout_prefix
-#define dout_prefix _prefix(_dout, mon, paxos, service_name)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name) {
+#define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
+static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name,
+ version_t fc, version_t lc) {
return *_dout << "mon." << mon->name << "@" << mon->rank
<< "(" << mon->get_state_name()
- << ").paxosservice(" << service_name << ") ";
+ << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
}
bool PaxosService::dispatch(PaxosServiceMessage *m)
@@ -68,9 +69,6 @@ bool PaxosService::dispatch(PaxosServiceMessage *m)
return true;
}
- // make sure service has latest from paxos.
- update_from_paxos();
-
// preprocess
if (preprocess_query(m))
return true; // easy!
@@ -97,11 +95,11 @@ bool PaxosService::dispatch(PaxosServiceMessage *m)
} else {
// delay a bit
if (!proposal_timer) {
- dout(10) << " setting propose timer with delay of " << delay << dendl;
proposal_timer = new C_Propose(this);
+ dout(10) << " setting proposal_timer " << proposal_timer << " with delay of " << delay << dendl;
mon->timer.add_event_after(delay, proposal_timer);
} else {
- dout(10) << " propose timer already set" << dendl;
+ dout(10) << " proposal_timer already set" << dendl;
}
}
} else {
@@ -111,6 +109,18 @@ bool PaxosService::dispatch(PaxosServiceMessage *m)
return true;
}
+void PaxosService::refresh(bool *need_bootstrap)
+{
+ // update cached versions
+ cached_first_committed = mon->store->get(get_service_name(), first_committed_name);
+ cached_last_committed = mon->store->get(get_service_name(), last_committed_name);
+
+ dout(10) << __func__ << dendl;
+
+ update_from_paxos(need_bootstrap);
+}
+
+
void PaxosService::scrub()
{
dout(10) << __func__ << dendl;
@@ -158,8 +168,9 @@ void PaxosService::propose_pending()
return;
if (proposal_timer) {
+ dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
mon->timer.cancel_event(proposal_timer);
- proposal_timer = 0;
+ proposal_timer = NULL;
}
/**
@@ -214,6 +225,7 @@ void PaxosService::restart()
{
dout(10) << "restart" << dendl;
if (proposal_timer) {
+ dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
@@ -228,6 +240,7 @@ void PaxosService::election_finished()
dout(10) << "election_finished" << dendl;
if (proposal_timer) {
+ dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
@@ -256,9 +269,6 @@ void PaxosService::_active()
}
dout(10) << "_active" << dendl;
- // pull latest from paxos
- update_from_paxos();
-
scrub();
// create pending state?
@@ -300,6 +310,7 @@ void PaxosService::shutdown()
cancel_events();
if (proposal_timer) {
+ dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h
index a49e60a6fdc..73a7bb56efa 100644
--- a/src/mon/PaxosService.h
+++ b/src/mon/PaxosService.h
@@ -197,7 +197,8 @@ public:
first_committed_name("first_committed"),
last_accepted_name("last_accepted"),
mkfs_name("mkfs"),
- full_version_name("full"), full_latest_name("latest")
+ full_version_name("full"), full_latest_name("latest"),
+ cached_first_committed(0), cached_last_committed(0)
{
}
@@ -313,6 +314,8 @@ public:
*/
bool dispatch(PaxosServiceMessage *m);
+ void refresh(bool *need_bootstrap);
+
/**
* @defgroup PaxosService_h_override_funcs Functions that should be
* overridden.
@@ -335,7 +338,7 @@ public:
*
* @returns 'true' on success; 'false' otherwise.
*/
- virtual void update_from_paxos() = 0;
+ virtual void update_from_paxos(bool *need_bootstrap) = 0;
/**
* Init on startup
@@ -472,6 +475,22 @@ public:
*/
/**
+ * @defgroup PaxosService_h_version_cache Variables holding cached values
+ * for the most used versions (first
+ * and last committed); we only have
+ * to read them when the store is
+ * updated, so in-between updates we
+ * may very well use cached versions
+ * and avoid the overhead.
+ * @{
+ */
+ version_t cached_first_committed;
+ version_t cached_last_committed;
+ /**
+ * @}
+ */
+
+ /**
* Callback list to be used whenever we are running a proposal through
* Paxos. These callbacks will be awaken whenever the said proposal
* finishes.
@@ -497,35 +516,27 @@ public:
* @returns true if in state ACTIVE; false otherwise.
*/
bool is_active() {
- return (!is_proposing() && !paxos->is_recovering()
- && !paxos->is_locked());
+ return
+ !is_proposing() &&
+ (paxos->is_active() || paxos->is_updating());
}
/**
* Check if we are readable.
*
- * We consider that a given version @p ver is readable if:
+ * This mirrors on the paxos check, except that we also verify that
*
- * - it exists (i.e., is lower than the last committed version);
- * - we have at least one committed version (i.e., last committed version
- * is greater than zero);
- * - our monitor is a member of the cluster (either a peon or the leader);
- * - we are not proposing a new version;
- * - the Paxos is not recovering;
- * - we either belong to a quorum and have a valid lease, or we belong to
- * a quorum of one.
+ * - the client hasn't seen the future relative to this PaxosService
+ * - this service isn't proposing.
*
* @param ver The version we want to check if is readable
* @returns true if it is readable; false otherwise
*/
bool is_readable(version_t ver = 0) {
- if ((ver > get_last_committed())
- || ((!mon->is_peon() && !mon->is_leader()))
- || (is_proposing() || paxos->is_recovering() || paxos->is_locked())
- || (get_last_committed() <= 0)
- || ((mon->get_quorum().size() != 1) && !paxos->is_lease_valid())) {
+ if (ver > get_last_committed() ||
+ is_proposing() ||
+ !paxos->is_readable(0))
return false;
- }
return true;
}
@@ -535,19 +546,16 @@ public:
* We consider to be writeable iff:
*
* - we are not proposing a new version;
- * - our monitor is the leader;
- * - we have a valid lease;
- * - Paxos is not boostrapping.
- * - Paxos is not recovering.
* - we are ready to be written to -- i.e., we have a pending value.
+ * - paxos is writeable
*
* @returns true if writeable; false otherwise
*/
bool is_writeable() {
- return (is_active()
- && mon->is_leader()
- && paxos->is_lease_valid()
- && is_write_ready());
+ return
+ !is_proposing() &&
+ is_write_ready() &&
+ paxos->is_writeable();
}
/**
@@ -598,8 +606,8 @@ public:
* Paxos; otherwise, we may assert on Paxos::wait_for_readable() if it
* happens to be readable at that specific point in time.
*/
- if (is_proposing() || (ver > get_last_committed())
- || (get_last_committed() <= 0))
+ if (is_proposing() ||
+ ver > get_last_committed())
wait_for_finished_proposal(c);
else
paxos->wait_for_readable(c);
@@ -611,12 +619,12 @@ public:
* @param c The callback to be awaken once we become writeable.
*/
void wait_for_writeable(Context *c) {
- if (!is_proposing()) {
+ if (is_proposing())
+ wait_for_finished_proposal(c);
+ else if (!is_write_ready())
+ wait_for_active(c);
+ else
paxos->wait_for_writeable(c);
- return;
- }
-
- wait_for_finished_proposal(c);
}
/**
@@ -872,13 +880,19 @@ public:
* the back store for reading purposes
* @{
*/
+
+ /**
+ * @defgroup PaxosService_h_version_cache Obtain cached versions for this
+ * service.
+ * @{
+ */
/**
* Get the first committed version
*
* @returns Our first committed version (that is available)
*/
version_t get_first_committed() {
- return mon->store->get(get_service_name(), first_committed_name);
+ return cached_first_committed;
}
/**
* Get the last committed version
@@ -886,7 +900,7 @@ public:
* @returns Our last committed version
*/
version_t get_last_committed() {
- return mon->store->get(get_service_name(), last_committed_name);
+ return cached_last_committed;
}
/**
* Get our current version
@@ -896,6 +910,11 @@ public:
version_t get_version() {
return get_last_committed();
}
+
+ /**
+ * @}
+ */
+
/**
* Get the contents of a given version @p ver
*