summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Durgin <josh.durgin@inktank.com>2013-04-10 10:45:07 -0700
committerJosh Durgin <josh.durgin@inktank.com>2013-04-10 10:45:07 -0700
commit1ef0ecfcb40970641784c1ba214dd5a72ff6d5e3 (patch)
tree4a044a8ac71e161ab91224424c8eb4ae77557721
parentdc1af54cc50b7d042d3d3bf92e51297ec96d9743 (diff)
parent3888a12385aa6fcf35c9cdce9ad82a2cdd3377b7 (diff)
downloadceph-1ef0ecfcb40970641784c1ba214dd5a72ff6d5e3.tar.gz
Merge branch 'next'
-rw-r--r--src/ceph_mon.cc6
-rw-r--r--src/ceph_osd.cc14
-rw-r--r--src/common/ceph_argparse.cc52
-rw-r--r--src/common/ceph_argparse.h5
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/global/global_init.cc1
-rw-r--r--src/mds/Server.cc3
-rw-r--r--src/mon/MDSMonitor.cc3
-rw-r--r--src/mon/Monitor.cc2
-rw-r--r--src/mon/MonmapMonitor.cc3
-rw-r--r--src/mon/OSDMonitor.cc3
-rw-r--r--src/mon/PGMonitor.cc7
-rw-r--r--src/msg/Message.h50
-rw-r--r--src/msg/Messenger.h13
-rw-r--r--src/msg/Pipe.cc46
-rw-r--r--src/msg/SimpleMessenger.h13
-rw-r--r--src/os/FileJournal.cc4
-rw-r--r--src/os/FileJournal.h17
-rw-r--r--src/osdc/Journaler.cc18
-rw-r--r--src/osdc/Journaler.h4
-rw-r--r--src/test/cli-integration/rbd/formatted-output.t16
-rw-r--r--src/test/mon/test_mon_workloadgen.cc4
22 files changed, 199 insertions, 86 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
index 4a4df8942e9..72354e18876 100644
--- a/src/ceph_mon.cc
+++ b/src/ceph_mon.cc
@@ -407,15 +407,15 @@ int main(int argc, const char **argv)
// throttle client traffic
Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
g_conf->mon_client_bytes);
- messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL);
// throttle daemon traffic
// NOTE: actual usage on the leader may multiply by the number of
// monitors if they forward large update messages from daemons.
Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
g_conf->mon_daemon_bytes);
- messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler);
- messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL);
cout << "starting " << g_conf->name << " rank " << rank
<< " at " << ipaddr
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index 5a90abd6125..33a107c1dc0 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -338,9 +338,12 @@ int main(int argc, const char **argv)
"(no journal)" : g_conf->osd_journal)
<< std::endl;
- boost::scoped_ptr<Throttle> client_throttler(
+ boost::scoped_ptr<Throttle> client_byte_throttler(
new Throttle(g_ceph_context, "osd_client_bytes",
g_conf->osd_client_message_size_cap));
+ boost::scoped_ptr<Throttle> client_msg_throttler(
+ new Throttle(g_ceph_context, "osd_client_messages",
+ g_conf->osd_client_message_cap));
uint64_t supported =
CEPH_FEATURE_UID |
@@ -349,9 +352,9 @@ int main(int argc, const char **argv)
CEPH_FEATURE_MSG_AUTH;
client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
- client_messenger->set_policy_throttler(
- entity_name_t::TYPE_CLIENT,
- client_throttler.get()); // default, actually
+ client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ client_byte_throttler.get(),
+ client_msg_throttler.get());
client_messenger->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported,
CEPH_FEATURE_UID |
@@ -462,7 +465,8 @@ int main(int argc, const char **argv)
delete messenger_hbclient;
delete messenger_hbserver;
delete cluster_messenger;
- client_throttler.reset();
+ client_byte_throttler.reset();
+ client_msg_throttler.reset();
g_ceph_context->put();
// cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
diff --git a/src/common/ceph_argparse.cc b/src/common/ceph_argparse.cc
index 889c713d736..2ff520e45c9 100644
--- a/src/common/ceph_argparse.cc
+++ b/src/common/ceph_argparse.cc
@@ -252,7 +252,8 @@ bool ceph_argparse_binary_flag(std::vector<const char*> &args,
}
static bool va_ceph_argparse_witharg(std::vector<const char*> &args,
- std::vector<const char*>::iterator &i, std::string *ret, va_list ap)
+ std::vector<const char*>::iterator &i, std::string *ret, bool cli,
+ va_list ap)
{
const char *first = *i;
char tmp[strlen(first)+1];
@@ -278,8 +279,12 @@ static bool va_ceph_argparse_witharg(std::vector<const char*> &args,
else if (first[strlen_a] == '\0') {
// find second part (or not)
if (i+1 == args.end()) {
- cerr << "Option " << *i << " requires an argument." << std::endl;
- _exit(1);
+ if (cli) {
+ cerr << "Option " << *i << " requires an argument." << std::endl;
+ _exit(1);
+ } else {
+ return false;
+ }
}
i = args.erase(i);
*ret = *i;
@@ -296,11 +301,21 @@ bool ceph_argparse_witharg(std::vector<const char*> &args,
bool r;
va_list ap;
va_start(ap, ret);
- r = va_ceph_argparse_witharg(args, i, ret, ap);
+ r = va_ceph_argparse_witharg(args, i, ret, false, ap);
va_end(ap);
return r;
}
+bool ceph_argparse_witharg_daemon(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, std::string *ret, ...)
+{
+ bool r;
+ va_list ap;
+ va_start(ap, ret);
+ r = va_ceph_argparse_witharg(args, i, ret, false, ap);
+ va_end(ap);
+ return r;
+}
bool ceph_argparse_withint(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, int *ret,
std::ostream *oss, ...)
@@ -309,7 +324,30 @@ bool ceph_argparse_withint(std::vector<const char*> &args,
va_list ap;
std::string str;
va_start(ap, oss);
- r = va_ceph_argparse_witharg(args, i, &str, ap);
+ r = va_ceph_argparse_witharg(args, i, &str, true, ap);
+ va_end(ap);
+ if (!r) {
+ return false;
+ }
+
+ std::string err;
+ int myret = strict_strtol(str.c_str(), 10, &err);
+ *ret = myret;
+ if (!err.empty()) {
+ *oss << err;
+ }
+ return true;
+}
+
+bool ceph_argparse_withint_daemon(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, int *ret,
+ std::ostream *oss, ...)
+{
+ bool r;
+ va_list ap;
+ std::string str;
+ va_start(ap, oss);
+ r = va_ceph_argparse_witharg(args, i, &str, false, ap);
va_end(ap);
if (!r) {
return false;
@@ -332,7 +370,7 @@ bool ceph_argparse_withlonglong(std::vector<const char*> &args,
va_list ap;
std::string str;
va_start(ap, oss);
- r = va_ceph_argparse_witharg(args, i, &str, ap);
+ r = va_ceph_argparse_witharg(args, i, &str, false, ap);
va_end(ap);
if (!r) {
return false;
@@ -355,7 +393,7 @@ bool ceph_argparse_withfloat(std::vector<const char*> &args,
va_list ap;
std::string str;
va_start(ap, oss);
- r = va_ceph_argparse_witharg(args, i, &str, ap);
+ r = va_ceph_argparse_witharg(args, i, &str, false, ap);
va_end(ap);
if (!r) {
return false;
diff --git a/src/common/ceph_argparse.h b/src/common/ceph_argparse.h
index 3ef0251abeb..5249704c0b8 100644
--- a/src/common/ceph_argparse.h
+++ b/src/common/ceph_argparse.h
@@ -57,6 +57,8 @@ bool ceph_argparse_flag(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, ...);
bool ceph_argparse_witharg(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, std::string *ret, ...);
+bool ceph_argparse_witharg_daemon(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, std::string *ret, ...);
bool ceph_argparse_binary_flag(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, int *ret,
std::ostream *oss, ...);
@@ -66,6 +68,9 @@ extern CephInitParameters ceph_argparse_early_args
extern bool ceph_argparse_withint(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, int *ret,
std::ostream *oss, ...);
+extern bool ceph_argparse_withint_daemon(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, int *ret,
+ std::ostream *oss, ...);
extern bool ceph_argparse_withfloat(std::vector<const char*> &args,
std::vector<const char*>::iterator &i, float *ret,
std::ostream *oss, ...);
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index cb2fd391fc9..19f3082ec4b 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -349,6 +349,7 @@ OPTION(osd_journal_size, OPT_INT, 5120) // in mb
OPTION(osd_max_write_size, OPT_INT, 90)
OPTION(osd_max_pgls, OPT_U64, 1024) // max number of pgls entries to return
OPTION(osd_client_message_size_cap, OPT_U64, 500*1024L*1024L) // client data allowed in-memory (in bytes)
+OPTION(osd_client_message_cap, OPT_U64, 100) // num client messages allowed in-memory
OPTION(osd_pg_bits, OPT_INT, 6) // bits per osd
OPTION(osd_pgp_bits, OPT_INT, 6) // bits per osd
OPTION(osd_crush_chooseleaf_type, OPT_INT, 1) // 1 = host
diff --git a/src/global/global_init.cc b/src/global/global_init.cc
index 43ce0909565..0f7179c39df 100644
--- a/src/global/global_init.cc
+++ b/src/global/global_init.cc
@@ -148,6 +148,7 @@ void global_init_daemonize(CephContext *cct, int flags)
return;
// stop log thread
+ g_ceph_context->_log->flush();
g_ceph_context->_log->stop();
int ret = daemon(1, 1);
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index 293640e4870..dc7ea23f763 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -2684,11 +2684,12 @@ public:
mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR);
+ mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
+
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_extra_bl(mdr->reply_extra_bl);
mds->server->reply_request(mdr, reply);
- mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
assert(g_conf->mds_kill_openc_at != 1);
}
};
diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc
index 0ca0f78bf45..3ea6d860039 100644
--- a/src/mon/MDSMonitor.cc
+++ b/src/mon/MDSMonitor.cc
@@ -548,7 +548,8 @@ bool MDSMonitor::preprocess_command(MMonCommand *m)
for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) {
if (ceph_argparse_double_dash(args, i))
break;
- else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL))
+ else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format",
+ (char*)NULL))
format = val;
else if (!epoch) {
long l = parse_pos_long(*i++, &ss);
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index d8d4579f67b..a49d08a02af 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2483,7 +2483,7 @@ void Monitor::handle_command(MMonCommand *m)
JSONFormatter *jf = NULL;
for (vector<const char*>::iterator i = args.begin(); i != args.end();) {
string val;
- if (ceph_argparse_witharg(args, i, &val,
+ if (ceph_argparse_witharg_daemon(args, i, &val,
"-f", "--format", (char*)NULL)) {
format = val;
} else {
diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc
index af8d7299227..f1c6dfa3325 100644
--- a/src/mon/MonmapMonitor.cc
+++ b/src/mon/MonmapMonitor.cc
@@ -222,7 +222,8 @@ bool MonmapMonitor::preprocess_command(MMonCommand *m)
for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) {
if (ceph_argparse_double_dash(args, i))
break;
- else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL))
+ else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format",
+ (char*)NULL))
format = val;
else if (!epoch) {
long l = parse_pos_long(*i++, &ss);
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index a9d68075cd4..40103ec402f 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -1894,7 +1894,8 @@ bool OSDMonitor::preprocess_command(MMonCommand *m)
for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) {
if (ceph_argparse_double_dash(args, i))
break;
- else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL))
+ else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format",
+ (char*)NULL))
format = val;
else if (!epoch) {
long l = parse_pos_long(*i++, &ss);
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index c05897215cc..fe50b34da8c 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -1096,7 +1096,8 @@ bool PGMonitor::preprocess_command(MMonCommand *m)
for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
- } else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL)) {
+ } else if (ceph_argparse_witharg_daemon(args, i, &val, "-f",
+ "--format", (char*)NULL)) {
format = val;
} else {
what = *i++;
@@ -1572,14 +1573,14 @@ int PGMonitor::dump_stuck_pg_stats(ostream& ss,
i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
- } else if (ceph_argparse_witharg(args, i, &val,
+ } else if (ceph_argparse_witharg_daemon(args, i, &val,
"-f", "--format", (char*)NULL)) {
if (val != "json" && val != "plain") {
ss << "format must be json or plain";
return -EINVAL;
}
format = val;
- } else if (ceph_argparse_withint(args, i, &seconds, &err,
+ } else if (ceph_argparse_withint_daemon(args, i, &seconds, &err,
"-t", "--threshold", (char*)NULL)) {
if (!err.str().empty()) {
ss << err.str();
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 1bf28e36f2d..33d26b2e7da 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -299,7 +299,10 @@ protected:
// release our size in bytes back to this throttler when our payload
// is adjusted or when we are destroyed.
- Throttle *throttler;
+ Throttle *byte_throttler;
+
+ // release a count back to this throttler when we are destroyed
+ Throttle *msg_throttler;
// keep track of how big this message was when we reserved space in
// the msgr dispatch_throttler, so that we can properly release it
@@ -313,14 +316,16 @@ protected:
public:
Message()
: connection(NULL),
- throttler(NULL),
+ byte_throttler(NULL),
+ msg_throttler(NULL),
dispatch_throttle_size(0) {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
};
Message(int t, int version=1, int compat_version=0)
: connection(NULL),
- throttler(NULL),
+ byte_throttler(NULL),
+ msg_throttler(NULL),
dispatch_throttle_size(0) {
memset(&header, 0, sizeof(header));
header.type = t;
@@ -340,8 +345,10 @@ protected:
assert(nref.read() == 0);
if (connection)
connection->put();
- if (throttler)
- throttler->put(payload.length() + middle.length() + data.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length() + middle.length() + data.length());
+ if (msg_throttler)
+ msg_throttler->put();
}
public:
Connection *get_connection() { return connection; }
@@ -350,8 +357,10 @@ public:
connection->put();
connection = c;
}
- void set_throttler(Throttle *t) { throttler = t; }
- Throttle *get_throttler() { return throttler; }
+ void set_byte_throttler(Throttle *t) { byte_throttler = t; }
+ Throttle *get_byte_throttler() { return byte_throttler; }
+ void set_message_throttler(Throttle *t) { msg_throttler = t; }
+ Throttle *get_message_throttler() { return msg_throttler; }
void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; }
@@ -369,39 +378,48 @@ public:
*/
void clear_payload() {
- if (throttler) throttler->put(payload.length() + middle.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length() + middle.length());
payload.clear();
middle.clear();
}
void clear_data() {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
data.clear();
}
bool empty_payload() { return payload.length() == 0; }
bufferlist& get_payload() { return payload; }
void set_payload(bufferlist& bl) {
- if (throttler) throttler->put(payload.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length());
payload.claim(bl);
- if (throttler) throttler->take(payload.length());
+ if (byte_throttler)
+ byte_throttler->take(payload.length());
}
void set_middle(bufferlist& bl) {
- if (throttler) throttler->put(payload.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length());
middle.claim(bl);
- if (throttler) throttler->take(payload.length());
+ if (byte_throttler)
+ byte_throttler->take(payload.length());
}
bufferlist& get_middle() { return middle; }
void set_data(const bufferlist &d) {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
data = d;
- if (throttler) throttler->take(data.length());
+ if (byte_throttler)
+ byte_throttler->take(data.length());
}
bufferlist& get_data() { return data; }
void claim_data(bufferlist& bl) {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
bl.claim(data);
}
off_t get_data_len() { return data.length(); }
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 7205940c118..b08fdaa7f30 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -74,7 +74,8 @@ public:
* the associated Connection(s). When reading in a new Message, the Messenger
* will call throttler->throttle() for the size of the new Message.
*/
- Throttle *throttler;
+ Throttle *throttler_bytes;
+ Throttle *throttler_messages;
/// Specify features supported locally by the endpoint.
uint64_t features_supported;
@@ -82,12 +83,16 @@ public:
uint64_t features_required;
Policy()
- : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL),
+ : lossy(false), server(false), standby(false), resetcheck(true),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
features_required(0) {}
private:
Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req)
- : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL),
+ : lossy(l), server(s), standby(st), resetcheck(r),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT),
features_required(req) {}
@@ -266,7 +271,7 @@ public:
* ownership of this pointer, but you must not destroy it before
* you destroy the Messenger.
*/
- virtual void set_policy_throttler(int type, Throttle *t) = 0;
+ virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
/**
* Set the default send priority
*
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index ae94a6a340c..f4100bc483b 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -311,6 +311,10 @@ int Pipe::accept()
// http://ceph.newdream.net/wiki/Messaging_protocol
int reply_tag = 0;
uint64_t existing_seq = -1;
+
+ // used for reading in the remote acked seq on connect
+ uint64_t newly_acked_seq = 0;
+
while (1) {
if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
@@ -639,7 +643,6 @@ int Pipe::accept()
}
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- uint64_t newly_acked_seq = 0;
if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_registered;
@@ -648,10 +651,10 @@ int Pipe::accept()
ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
goto fail_registered;
}
- discard_requeued_up_to(newly_acked_seq);
}
pipe_lock.Lock();
+ discard_requeued_up_to(newly_acked_seq);
if (state != STATE_CLOSED) {
ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
start_writer();
@@ -1661,14 +1664,20 @@ int Pipe::read_message(Message **pm)
Message *message;
utime_t recv_stamp = ceph_clock_now(msgr->cct);
+ if (policy.throttler_messages) {
+ ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ policy.throttler_messages->get();
+ }
+
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
- bool waited_on_throttle = false;
- if (policy.throttler) {
- ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
- << policy.throttler->get_current() << "/"
- << policy.throttler->get_max() << dendl;
- waited_on_throttle = policy.throttler->get(message_size);
+ if (policy.throttler_bytes) {
+ ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ policy.throttler_bytes->get(message_size);
}
// throttle total bytes waiting for dispatch. do this _after_ the
@@ -1678,7 +1687,7 @@ int Pipe::read_message(Message **pm)
ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
<< msgr->dispatch_throttler.get_current() << "/"
<< msgr->dispatch_throttler.get_max() << dendl;
- waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
+ msgr->dispatch_throttler.get(message_size);
}
utime_t throttle_stamp = ceph_clock_now(msgr->cct);
@@ -1807,7 +1816,8 @@ int Pipe::read_message(Message **pm)
}
}
- message->set_throttler(policy.throttler);
+ message->set_byte_throttler(policy.throttler_bytes);
+ message->set_message_throttler(policy.throttler_messages);
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
@@ -1822,12 +1832,18 @@ int Pipe::read_message(Message **pm)
out_dethrottle:
// release bytes reserved from the throttlers on failure
+ if (policy.throttler_messages) {
+ ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ policy.throttler_messages->put();
+ }
if (message_size) {
- if (policy.throttler) {
- ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler "
- << policy.throttler->get_current() << "/"
- << policy.throttler->get_max() << dendl;
- policy.throttler->put(message_size);
+ if (policy.throttler_bytes) {
+ ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ policy.throttler_bytes->put(message_size);
}
msgr->dispatch_throttle_release(message_size);
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index cc946e3d25a..d837a4496ae 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -162,12 +162,15 @@ public:
* ownership of this pointer, but you must not destroy it before
* you destroy SimpleMessenger.
*/
- void set_policy_throttler(int type, Throttle *t) {
+ void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
Mutex::Locker l(policy_lock);
- if (policy_map.count(type))
- policy_map[type].throttler = t;
- else
- default_policy.throttler = t;
+ if (policy_map.count(type)) {
+ policy_map[type].throttler_bytes = byte_throttle;
+ policy_map[type].throttler_messages = msg_throttle;
+ } else {
+ default_policy.throttler_bytes = byte_throttle;
+ default_policy.throttler_messages = msg_throttle;
+ }
}
/**
* Bind the SimpleMessenger to a specific address. If bind_addr
diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc
index f5e8f34137e..29139c12bca 100644
--- a/src/os/FileJournal.cc
+++ b/src/os/FileJournal.cc
@@ -389,9 +389,7 @@ int FileJournal::create()
header.alignment = 16; // at least stay word aligned on 64bit machines...
header.start = get_top();
- /* FileStore::mkfs initializes the fs op sequence file at 1. Therefore,
- * the first entry written must be at sequence 2.*/
- header.start_seq = 2;
+ header.start_seq = 0;
print_header();
diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h
index 7b1777928cd..0e826fb4940 100644
--- a/src/os/FileJournal.h
+++ b/src/os/FileJournal.h
@@ -109,7 +109,22 @@ public:
int64_t max_size; // max size of journal ring buffer
int64_t start; // offset of first entry
uint64_t committed_up_to; // committed up to
- uint64_t start_seq; // entry at header.start
+
+ /**
+ * start_seq
+ *
+ * entry at header.start has sequence >= start_seq
+ *
+ * Generally, the entry at header.start will have sequence
+ * start_seq if it exists. The only exception is immediately
+ * after journal creation since the first sequence number is
+ * not known.
+ *
+ * If the first read on open fails, we can assume corruption
+ * if start_seq > committed_up_thru because the entry would have
+ * a sequence >= start_seq and therefore > committed_up_thru.
+ */
+ uint64_t start_seq;
header_t() :
flags(0), block_size(0), alignment(0), max_size(0), start(0),
diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc
index 544ac8c49c4..cd9b9edc4c7 100644
--- a/src/osdc/Journaler.cc
+++ b/src/osdc/Journaler.cc
@@ -64,7 +64,6 @@ void Journaler::set_layout(ceph_file_layout *l)
if (periods < 2)
periods = 2; // we need at least 2 periods to make progress.
fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods;
- prefetch_from = fetch_len / 2;
}
@@ -835,6 +834,7 @@ void Journaler::_issue_read(uint64_t len)
void Journaler::_prefetch()
{
+ ldout(cct, 10) << "_prefetch" << dendl;
// prefetch
uint64_t pf;
if (temp_fetch_len) {
@@ -847,9 +847,11 @@ void Journaler::_prefetch()
uint64_t raw_target = read_pos + pf;
- // only read full log segments
+ // read full log segments, so increase if necessary
uint64_t period = get_layout_period();
- uint64_t target = raw_target - (raw_target % period);
+ uint64_t remainder = raw_target % period;
+ uint64_t adjustment = remainder ? period - remainder : 0;
+ uint64_t target = raw_target + adjustment;
// don't read past the log tail
if (target > write_pos)
@@ -883,7 +885,9 @@ bool Journaler::_is_readable()
read_buf.length() >= sizeof(s) + s)
return true; // yep, next entry is ready.
- // darn it!
+ ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+ << ", but need " << s + sizeof(s)
+ << " for next entry; fetch_len is " << fetch_len << dendl;
// partial fragment at the end?
if (received_pos == write_pos) {
@@ -902,12 +906,14 @@ bool Journaler::_is_readable()
return false;
}
- uint64_t need = (sizeof(s)+s-read_buf.length());
+ uint64_t need = sizeof(s) + s;
if (need > fetch_len) {
+ temp_fetch_len = sizeof(s) + s;
ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
<< " for len " << s << " entry" << dendl;
- temp_fetch_len = need;
}
+
+ ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
return false;
}
diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h
index 81a39d21a5b..dfc2115c18b 100644
--- a/src/osdc/Journaler.h
+++ b/src/osdc/Journaler.h
@@ -194,7 +194,6 @@ private:
uint64_t fetch_len; // how much to read at a time
uint64_t temp_fetch_len;
- uint64_t prefetch_from; // how far from end do we read next chunk
// for wait_for_readable()
Context *on_readable;
@@ -251,7 +250,7 @@ public:
prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0),
waiting_for_zero(false),
read_pos(0), requested_pos(0), received_pos(0),
- fetch_len(0), temp_fetch_len(0), prefetch_from(0),
+ fetch_len(0), temp_fetch_len(0),
on_readable(0), on_write_error(NULL),
expire_pos(0), trimming_pos(0), trimmed_pos(0)
{
@@ -273,7 +272,6 @@ public:
requested_pos = 0;
received_pos = 0;
fetch_len = 0;
- prefetch_from = 0;
assert(!on_readable);
expire_pos = 0;
trimming_pos = 0;
diff --git a/src/test/cli-integration/rbd/formatted-output.t b/src/test/cli-integration/rbd/formatted-output.t
index ef0b7042c8a..7be9c594a44 100644
--- a/src/test/cli-integration/rbd/formatted-output.t
+++ b/src/test/cli-integration/rbd/formatted-output.t
@@ -19,7 +19,7 @@ clone
$ rbd snap protect bar@snap
$ rbd clone bar@snap data/child
$ rbd snap create data/child@snap
- $ rbd flatten data/child > /dev/null
+ $ rbd flatten data/child 2> /dev/null
lock
====
@@ -687,10 +687,10 @@ whenever it is run. grep -v to ignore it, but still work on other distros.
# cleanup
$ rbd snap remove data/child@snap
$ rbd snap unprotect bar@snap
- $ rbd snap purge bar > /dev/null
- $ rbd snap purge foo > /dev/null
- $ rbd rm data/child > /dev/null
- $ rbd rm foo > /dev/null
- $ rbd rm bar > /dev/null
- $ rbd rm quux > /dev/null
- $ rbd rm baz > /dev/null
+ $ rbd snap purge bar 2> /dev/null
+ $ rbd snap purge foo 2> /dev/null
+ $ rbd rm data/child 2> /dev/null
+ $ rbd rm foo 2> /dev/null
+ $ rbd rm bar 2> /dev/null
+ $ rbd rm quux 2> /dev/null
+ $ rbd rm baz 2> /dev/null
diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc
index 216e6288b1f..07f999180a3 100644
--- a/src/test/mon/test_mon_workloadgen.cc
+++ b/src/test/mon/test_mon_workloadgen.cc
@@ -366,8 +366,8 @@ class OSDStub : public TestStub
messenger->set_default_policy(
Messenger::Policy::stateless_server(supported, 0));
- messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT,
- &throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ &throttler, NULL);
messenger->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |