diff options
author | Sage Weil <sage@inktank.com> | 2013-08-15 17:21:00 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-08-15 17:21:00 -0700 |
commit | 913a205c97858afc90617a3ef6097a9f0f2eaec7 (patch) | |
tree | 05c6ff5debdec8eb0aa905f8febdb8c0a0da6b74 | |
parent | 3f0afe162e8c6aa5ab3bfc094cfafbc5cd5279d7 (diff) | |
parent | 56e54cc489329febe132777526572c09635a5cd1 (diff) | |
download | ceph-913a205c97858afc90617a3ef6097a9f0f2eaec7.tar.gz |
Merge remote-tracking branch 'gh/next'
-rwxr-xr-x | src/ceph.in | 37 | ||||
-rw-r--r-- | src/common/ceph_context.cc | 8 | ||||
-rw-r--r-- | src/include/str_list.h | 15 | ||||
-rw-r--r-- | src/librados/AioCompletionImpl.h | 14 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.cc | 29 | ||||
-rw-r--r-- | src/mon/LogMonitor.cc | 6 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 10 | ||||
-rw-r--r-- | src/pybind/rados.py | 12 | ||||
-rw-r--r-- | src/test/pybind/test_rados.py | 33 | ||||
-rw-r--r-- | src/tools/ceph-monstore-tool.cc | 80 |
10 files changed, 186 insertions, 58 deletions
diff --git a/src/ceph.in b/src/ceph.in index fe369f82467..38464dfc2a2 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -506,35 +506,36 @@ def main(): format = parsed_args.output_format + sockpath = None if parsed_args.admin_socket: - try: - print admin_socket(parsed_args.admin_socket, childargs, format) - except Exception as e: - print >> sys.stderr, 'admin_socket: {0}'.format(e) - return 0 - - if len(childargs) > 0 and childargs[0] == "daemon": + sockpath = parsed_args.admin_socket + elif len(childargs) > 0 and childargs[0] == "daemon": + # Treat "daemon <path>" or "daemon <name>" like --admin_daemon <path> if len(childargs) > 2: if childargs[1].find('/') >= 0: - try: - print admin_socket(childargs[1], childargs[2:], format) - except Exception as e: - print >> sys.stderr, 'admin_socket: {0}'.format(e) - return errno.EINVAL - return 0 + sockpath = childargs[1] else: # try resolve daemon name - path = ceph_conf('admin_socket', childargs[1]) try: - print admin_socket(path, childargs[2:], format) + sockpath = ceph_conf('admin_socket', childargs[1]) except Exception as e: - print >> sys.stderr, 'admin_socket: {0}'.format(e) + print >> sys.stderr, \ + 'Can\'t get admin socket path: ' + str(e) return errno.EINVAL - return 0 + # for both: + childargs = childargs[2:] else: - print >> sys.stderr, 'Daemon requires at least 2 arguments' + print >> sys.stderr, 'daemon requires at least 3 arguments' return errno.EINVAL + if sockpath: + try: + print admin_socket(sockpath, childargs, format) + except Exception as e: + print >> sys.stderr, 'admin_socket: {0}'.format(e) + return errno.EINVAL + return 0 + # handle any 'generic' ceph arguments that we didn't parse here global cluster_handle diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index 9602fdf2e40..e694a2f09b3 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -26,6 +26,7 @@ #include "common/Formatter.h" #include "log/Log.h" #include "auth/Crypto.h" +#include "include/str_list.h" #include <iostream> #include <pthread.h> @@ -197,11 +198,10 @@ void CephContext::do_command(std::string command, cmdmap_t& cmdmap, f->dump_string("error", "syntax error: 'config set <var> <value>'"); } else { // val may be multiple words - ostringstream argss; - std::copy(val.begin(), val.end(), ostream_iterator<string>(argss, " ")); - int r = _conf->set_val(var.c_str(), argss.str().c_str()); + string valstr = str_join(val, " "); + int r = _conf->set_val(var.c_str(), valstr.c_str()); if (r < 0) { - f->dump_stream("error") << "error setting '" << var << "' to '" << val << "': " << cpp_strerror(r); + f->dump_stream("error") << "error setting '" << var << "' to '" << valstr << "': " << cpp_strerror(r); } else { ostringstream ss; _conf->apply_changes(&ss); diff --git a/src/include/str_list.h b/src/include/str_list.h index 8549c4f21ce..83a0e64e135 100644 --- a/src/include/str_list.h +++ b/src/include/str_list.h @@ -1,9 +1,10 @@ #ifndef CEPH_STRLIST_H #define CEPH_STRLIST_H -#include <string> #include <list> #include <set> +#include <sstream> +#include <string> #include <vector> extern void get_str_list(const std::string& str, @@ -22,5 +23,17 @@ extern void get_str_set(const std::string& str, const char *delims, std::set<std::string>& str_list); +inline std::string str_join(const std::vector<std::string>& v, std::string sep) +{ + if (v.empty()) + return std::string(); + std::vector<std::string>::const_iterator i = v.begin(); + std::string r = *i; + for (++i; i != v.end(); ++i) { + r += sep; + r += *i; + } + return r; +} #endif diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h index 34462d22063..cf049e0a9a2 100644 --- a/src/librados/AioCompletionImpl.h +++ b/src/librados/AioCompletionImpl.h @@ -134,10 +134,14 @@ struct librados::AioCompletionImpl { void get() { lock.Lock(); - assert(ref > 0); - ref++; + _get(); lock.Unlock(); } + void _get() { + assert(lock.is_locked()); + assert(ref > 0); + ++ref; + } void release() { lock.Lock(); assert(!released); @@ -162,7 +166,7 @@ struct C_AioComplete : public Context { AioCompletionImpl *c; C_AioComplete(AioCompletionImpl *cc) : c(cc) { - c->ref++; + c->_get(); } void finish(int r) { @@ -181,7 +185,7 @@ struct C_AioSafe : public Context { AioCompletionImpl *c; C_AioSafe(AioCompletionImpl *cc) : c(cc) { - c->ref++; + c->_get(); } void finish(int r) { @@ -208,7 +212,7 @@ struct C_AioCompleteAndSafe : public Context { AioCompletionImpl *c; C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) { - c->ref++; + c->get(); } void finish(int r) { diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index b5da027e8f6..ce9743a54b3 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -82,22 +82,25 @@ void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) aio_write_list_lock.Lock(); assert(c->io == this); c->aio_write_list_item.remove_myself(); - // queue async flush waiters - map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters = - aio_write_waiters.find(c->aio_write_seq); - if (waiters != aio_write_waiters.end()) { - ldout(client->cct, 20) << "found " << waiters->second.size() - << " waiters" << dendl; + + map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin(); + while (waiters != aio_write_waiters.end()) { + if (!aio_write_list.empty() && + aio_write_list.front()->aio_write_seq <= waiters->first) { + ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq + << " <= waiter " << waiters->first + << ", stopping" << dendl; + break; + } + ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl; for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin(); it != waiters->second.end(); ++it) { client->finisher.queue(new C_AioCompleteAndSafe(*it)); (*it)->put(); } - aio_write_waiters.erase(waiters); - } else { - ldout(client->cct, 20) << "found no waiters for tid " - << c->aio_write_seq << dendl; + aio_write_waiters.erase(waiters++); } + aio_write_cond.Signal(); aio_write_list_lock.Unlock(); put(); @@ -109,11 +112,13 @@ void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c) << " completion " << c << dendl; Mutex::Locker l(aio_write_list_lock); tid_t seq = aio_write_seq; - ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid " - << seq << dendl; if (aio_write_list.empty()) { + ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid " + << seq << ")" << dendl; client->finisher.queue(new C_AioCompleteAndSafe(c)); } else { + ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size() + << " writes in flight; waiting on tid " << seq << dendl; c->get(); aio_write_waiters[seq].push_back(c); } diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc index 792bc682de4..a545242577d 100644 --- a/src/mon/LogMonitor.cc +++ b/src/mon/LogMonitor.cc @@ -29,6 +29,7 @@ #include "common/errno.h" #include "common/config.h" #include "include/assert.h" +#include "include/str_list.h" #define dout_subsys ceph_subsys_mon #undef dout_prefix @@ -372,15 +373,12 @@ bool LogMonitor::prepare_command(MMonCommand *m) if (prefix == "log") { vector<string> logtext; cmd_getval(g_ceph_context, cmdmap, "logtext", logtext); - ostringstream ds; - std::copy(logtext.begin(), logtext.end(), - ostream_iterator<string>(ds, " ")); LogEntry le; le.who = m->get_orig_source_inst(); le.stamp = m->get_recv_stamp(); le.seq = 0; le.type = CLOG_INFO; - le.msg = ds.str(); + le.msg = str_join(logtext, " "); pending_summary.add(le); pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le)); wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, string(), get_last_committed())); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index cf10f911258..61e2a2aa57a 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2055,11 +2055,8 @@ void Monitor::handle_command(MMonCommand *m) cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args); if (!injected_args.empty()) { dout(0) << "parsing injected options '" << injected_args << "'" << dendl; - ostringstream argss; - std::copy(injected_args.begin(), injected_args.end(), - ostream_iterator<string>(argss, " ")); ostringstream oss; - r = g_conf->injectargs(argss.str().c_str(), &oss); + r = g_conf->injectargs(str_join(injected_args, " "), &oss); ss << "injectargs:" << oss.str(); rs = ss.str(); goto out; @@ -2131,10 +2128,7 @@ void Monitor::handle_command(MMonCommand *m) vector<string> tagsvec; cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec); - stringstream tags; - std::copy(tagsvec.begin(), tagsvec.end(), - ostream_iterator<string>(tags, " ")); - string tagstr = tags.str(); + string tagstr = str_join(tagsvec, " "); if (!tagstr.empty()) tagstr = tagstr.substr(0, tagstr.find_last_of(' ')); f->dump_string("tag", tagstr); diff --git a/src/pybind/rados.py b/src/pybind/rados.py index e543ff79305..7768f8c39d3 100644 --- a/src/pybind/rados.py +++ b/src/pybind/rados.py @@ -182,19 +182,27 @@ class Rados(object): raise RadosStateError("You cannot perform that operation on a \ Rados object in state %s." % (self.state)) - def __init__(self, rados_id=None, name='client.admin', clustername='ceph', + def __init__(self, rados_id=None, name=None, clustername=None, conf_defaults=None, conffile=None, conf=None, flags=0): self.librados = CDLL('librados.so.2') self.cluster = c_void_p() self.rados_id = rados_id - if rados_id and not isinstance(rados_id, str): + if rados_id is not None and not isinstance(rados_id, str): raise TypeError('rados_id must be a string or None') if conffile is not None and not isinstance(conffile, str): raise TypeError('conffile must be a string or None') + if name is not None and not isinstance(name, str): + raise TypeError('name must be a string or None') + if clustername is not None and not isinstance(clustername, str): + raise TypeError('clustername must be a string or None') if rados_id and name: raise Error("Rados(): can't supply both rados_id and name") if rados_id: name = 'client.' + rados_id + if name is None: + name = 'client.admin' + if clustername is None: + clustername = 'ceph' ret = run_in_thread(self.librados.rados_create2, (byref(self.cluster), c_char_p(clustername), c_char_p(name), c_uint64(flags))) diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py index 5176d6383d5..4628a44a652 100644 --- a/src/test/pybind/test_rados.py +++ b/src/test/pybind/test_rados.py @@ -1,10 +1,41 @@ from nose.tools import eq_ as eq, assert_raises -from rados import (Rados, Object, ObjectExists, ObjectNotFound, +from rados import (Rados, Error, Object, ObjectExists, ObjectNotFound, ANONYMOUS_AUID, ADMIN_AUID) import threading import json import errno +def test_rados_init_error(): + assert_raises(Error, Rados, conffile='', rados_id='admin', + name='client.admin') + assert_raises(Error, Rados, conffile='', name='invalid') + assert_raises(Error, Rados, conffile='', name='bad.invalid') + +def test_rados_init_type_error(): + assert_raises(TypeError, Rados, rados_id=u'admin') + assert_raises(TypeError, Rados, rados_id=u'') + assert_raises(TypeError, Rados, name=u'client.admin') + assert_raises(TypeError, Rados, name=u'') + assert_raises(TypeError, Rados, conffile=u'blah') + assert_raises(TypeError, Rados, conffile=u'') + assert_raises(TypeError, Rados, clusternaem=u'blah') + assert_raises(TypeError, Rados, clustername=u'') + +def test_rados_init(): + with Rados(conffile='', rados_id='admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + +def test_ioctx_context_manager(): + with Rados(conffile='', rados_id='admin') as conn: + with conn.open_ioctx('data') as ioctx: + pass + class TestRados(object): def setUp(self): diff --git a/src/tools/ceph-monstore-tool.cc b/src/tools/ceph-monstore-tool.cc index f361266aff0..4ab8fa86465 100644 --- a/src/tools/ceph-monstore-tool.cc +++ b/src/tools/ceph-monstore-tool.cc @@ -33,6 +33,7 @@ #include "mon/MonitorDBStore.h" #include "mon/Paxos.h" #include "common/Formatter.h" +#include "include/stringify.h" namespace po = boost::program_options; using namespace std; @@ -180,13 +181,20 @@ int main(int argc, char **argv) { if (vm.count("out")) { if ((fd = open(out_path.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666)) == -1) { int _err = errno; - std::cerr << "Couldn't open " << out_path << cpp_strerror(_err) << std::endl; - return 1; + if (_err != EISDIR) { + std::cerr << "Couldn't open " << out_path << ": " << cpp_strerror(_err) << std::endl; + return 1; + } } } else { fd = STDOUT_FILENO; } + if (fd < 0 && cmd != "store-copy") { + std::cerr << "error: '" << out_path << "' is a directory!" << std::endl; + return 1; + } + MonitorDBStore st(store_path); if (store_path.size()) { stringstream ss; @@ -331,13 +339,79 @@ int main(int argc, char **argv) { t.compact_prefix(prefix); st.apply_transaction(t); } + } else if (cmd == "store-copy") { + if (!store_path.size()) { + std::cerr << "need mon store path to copy from" << std::endl; + std::cerr << desc << std::endl; + goto done; + } + if (!out_path.size()) { + std::cerr << "need mon store path to copy to (--out <mon_data_dir>)" + << std::endl; + std::cerr << desc << std::endl; + goto done; + } + if (fd > 0) { + std::cerr << "supplied out path '" << out_path << "' is not a directory" + << std::endl; + goto done; + } + + MonitorDBStore out_store(out_path); + { + stringstream ss; + int r = out_store.create_and_open(ss); + if (r < 0) { + std::cerr << ss.str() << std::endl; + goto done; + } + } + + + KeyValueDB::WholeSpaceIterator it = st.get_iterator(); + uint64_t total_keys = 0; + uint64_t total_size = 0; + uint64_t total_tx = 0; + + do { + uint64_t num_keys = 0; + + MonitorDBStore::Transaction tx; + + while (it->valid() && num_keys < 128) { + pair<string,string> k = it->raw_key(); + bufferlist v = it->value(); + tx.put(k.first, k.second, v); + + num_keys ++; + total_tx ++; + total_size += v.length(); + + it->next(); + } + + total_keys += num_keys; + + if (!tx.empty()) + out_store.apply_transaction(tx); + + std::cout << "copied " << total_keys << " keys so far (" + << stringify(si_t(total_size)) << ")" << std::endl; + + } while (it->valid()); + + std::cout << "summary: copied " << total_keys << " keys, using " + << total_tx << " transactions, totalling " + << stringify(si_t(total_size)) << std::endl; + std::cout << "from '" << store_path << "' to '" << out_path << "'" + << std::endl; } else { std::cerr << "Unrecognized command: " << cmd << std::endl; goto done; } done: - if (vm.count("out")) { + if (vm.count("out") && fd > 0) { ::close(fd); } return 0; |