summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-08-15 17:21:00 -0700
committerSage Weil <sage@inktank.com>2013-08-15 17:21:00 -0700
commit913a205c97858afc90617a3ef6097a9f0f2eaec7 (patch)
tree05c6ff5debdec8eb0aa905f8febdb8c0a0da6b74
parent3f0afe162e8c6aa5ab3bfc094cfafbc5cd5279d7 (diff)
parent56e54cc489329febe132777526572c09635a5cd1 (diff)
downloadceph-913a205c97858afc90617a3ef6097a9f0f2eaec7.tar.gz
Merge remote-tracking branch 'gh/next'
-rwxr-xr-xsrc/ceph.in37
-rw-r--r--src/common/ceph_context.cc8
-rw-r--r--src/include/str_list.h15
-rw-r--r--src/librados/AioCompletionImpl.h14
-rw-r--r--src/librados/IoCtxImpl.cc29
-rw-r--r--src/mon/LogMonitor.cc6
-rw-r--r--src/mon/Monitor.cc10
-rw-r--r--src/pybind/rados.py12
-rw-r--r--src/test/pybind/test_rados.py33
-rw-r--r--src/tools/ceph-monstore-tool.cc80
10 files changed, 186 insertions, 58 deletions
diff --git a/src/ceph.in b/src/ceph.in
index fe369f82467..38464dfc2a2 100755
--- a/src/ceph.in
+++ b/src/ceph.in
@@ -506,35 +506,36 @@ def main():
format = parsed_args.output_format
+ sockpath = None
if parsed_args.admin_socket:
- try:
- print admin_socket(parsed_args.admin_socket, childargs, format)
- except Exception as e:
- print >> sys.stderr, 'admin_socket: {0}'.format(e)
- return 0
-
- if len(childargs) > 0 and childargs[0] == "daemon":
+ sockpath = parsed_args.admin_socket
+ elif len(childargs) > 0 and childargs[0] == "daemon":
+ # Treat "daemon <path>" or "daemon <name>" like --admin_daemon <path>
if len(childargs) > 2:
if childargs[1].find('/') >= 0:
- try:
- print admin_socket(childargs[1], childargs[2:], format)
- except Exception as e:
- print >> sys.stderr, 'admin_socket: {0}'.format(e)
- return errno.EINVAL
- return 0
+ sockpath = childargs[1]
else:
# try resolve daemon name
- path = ceph_conf('admin_socket', childargs[1])
try:
- print admin_socket(path, childargs[2:], format)
+ sockpath = ceph_conf('admin_socket', childargs[1])
except Exception as e:
- print >> sys.stderr, 'admin_socket: {0}'.format(e)
+ print >> sys.stderr, \
+ 'Can\'t get admin socket path: ' + str(e)
return errno.EINVAL
- return 0
+ # for both:
+ childargs = childargs[2:]
else:
- print >> sys.stderr, 'Daemon requires at least 2 arguments'
+ print >> sys.stderr, 'daemon requires at least 3 arguments'
return errno.EINVAL
+ if sockpath:
+ try:
+ print admin_socket(sockpath, childargs, format)
+ except Exception as e:
+ print >> sys.stderr, 'admin_socket: {0}'.format(e)
+ return errno.EINVAL
+ return 0
+
# handle any 'generic' ceph arguments that we didn't parse here
global cluster_handle
diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc
index 9602fdf2e40..e694a2f09b3 100644
--- a/src/common/ceph_context.cc
+++ b/src/common/ceph_context.cc
@@ -26,6 +26,7 @@
#include "common/Formatter.h"
#include "log/Log.h"
#include "auth/Crypto.h"
+#include "include/str_list.h"
#include <iostream>
#include <pthread.h>
@@ -197,11 +198,10 @@ void CephContext::do_command(std::string command, cmdmap_t& cmdmap,
f->dump_string("error", "syntax error: 'config set <var> <value>'");
} else {
// val may be multiple words
- ostringstream argss;
- std::copy(val.begin(), val.end(), ostream_iterator<string>(argss, " "));
- int r = _conf->set_val(var.c_str(), argss.str().c_str());
+ string valstr = str_join(val, " ");
+ int r = _conf->set_val(var.c_str(), valstr.c_str());
if (r < 0) {
- f->dump_stream("error") << "error setting '" << var << "' to '" << val << "': " << cpp_strerror(r);
+ f->dump_stream("error") << "error setting '" << var << "' to '" << valstr << "': " << cpp_strerror(r);
} else {
ostringstream ss;
_conf->apply_changes(&ss);
diff --git a/src/include/str_list.h b/src/include/str_list.h
index 8549c4f21ce..83a0e64e135 100644
--- a/src/include/str_list.h
+++ b/src/include/str_list.h
@@ -1,9 +1,10 @@
#ifndef CEPH_STRLIST_H
#define CEPH_STRLIST_H
-#include <string>
#include <list>
#include <set>
+#include <sstream>
+#include <string>
#include <vector>
extern void get_str_list(const std::string& str,
@@ -22,5 +23,17 @@ extern void get_str_set(const std::string& str,
const char *delims,
std::set<std::string>& str_list);
+inline std::string str_join(const std::vector<std::string>& v, std::string sep)
+{
+ if (v.empty())
+ return std::string();
+ std::vector<std::string>::const_iterator i = v.begin();
+ std::string r = *i;
+ for (++i; i != v.end(); ++i) {
+ r += sep;
+ r += *i;
+ }
+ return r;
+}
#endif
diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h
index 34462d22063..cf049e0a9a2 100644
--- a/src/librados/AioCompletionImpl.h
+++ b/src/librados/AioCompletionImpl.h
@@ -134,10 +134,14 @@ struct librados::AioCompletionImpl {
void get() {
lock.Lock();
- assert(ref > 0);
- ref++;
+ _get();
lock.Unlock();
}
+ void _get() {
+ assert(lock.is_locked());
+ assert(ref > 0);
+ ++ref;
+ }
void release() {
lock.Lock();
assert(!released);
@@ -162,7 +166,7 @@ struct C_AioComplete : public Context {
AioCompletionImpl *c;
C_AioComplete(AioCompletionImpl *cc) : c(cc) {
- c->ref++;
+ c->_get();
}
void finish(int r) {
@@ -181,7 +185,7 @@ struct C_AioSafe : public Context {
AioCompletionImpl *c;
C_AioSafe(AioCompletionImpl *cc) : c(cc) {
- c->ref++;
+ c->_get();
}
void finish(int r) {
@@ -208,7 +212,7 @@ struct C_AioCompleteAndSafe : public Context {
AioCompletionImpl *c;
C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
- c->ref++;
+ c->get();
}
void finish(int r) {
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc
index b5da027e8f6..ce9743a54b3 100644
--- a/src/librados/IoCtxImpl.cc
+++ b/src/librados/IoCtxImpl.cc
@@ -82,22 +82,25 @@ void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_list_item.remove_myself();
- // queue async flush waiters
- map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
- aio_write_waiters.find(c->aio_write_seq);
- if (waiters != aio_write_waiters.end()) {
- ldout(client->cct, 20) << "found " << waiters->second.size()
- << " waiters" << dendl;
+
+ map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
+ while (waiters != aio_write_waiters.end()) {
+ if (!aio_write_list.empty() &&
+ aio_write_list.front()->aio_write_seq <= waiters->first) {
+ ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
+ << " <= waiter " << waiters->first
+ << ", stopping" << dendl;
+ break;
+ }
+ ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
it != waiters->second.end(); ++it) {
client->finisher.queue(new C_AioCompleteAndSafe(*it));
(*it)->put();
}
- aio_write_waiters.erase(waiters);
- } else {
- ldout(client->cct, 20) << "found no waiters for tid "
- << c->aio_write_seq << dendl;
+ aio_write_waiters.erase(waiters++);
}
+
aio_write_cond.Signal();
aio_write_list_lock.Unlock();
put();
@@ -109,11 +112,13 @@ void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
<< " completion " << c << dendl;
Mutex::Locker l(aio_write_list_lock);
tid_t seq = aio_write_seq;
- ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
- << seq << dendl;
if (aio_write_list.empty()) {
+ ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
+ << seq << ")" << dendl;
client->finisher.queue(new C_AioCompleteAndSafe(c));
} else {
+ ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
+ << " writes in flight; waiting on tid " << seq << dendl;
c->get();
aio_write_waiters[seq].push_back(c);
}
diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc
index 792bc682de4..a545242577d 100644
--- a/src/mon/LogMonitor.cc
+++ b/src/mon/LogMonitor.cc
@@ -29,6 +29,7 @@
#include "common/errno.h"
#include "common/config.h"
#include "include/assert.h"
+#include "include/str_list.h"
#define dout_subsys ceph_subsys_mon
#undef dout_prefix
@@ -372,15 +373,12 @@ bool LogMonitor::prepare_command(MMonCommand *m)
if (prefix == "log") {
vector<string> logtext;
cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
- ostringstream ds;
- std::copy(logtext.begin(), logtext.end(),
- ostream_iterator<string>(ds, " "));
LogEntry le;
le.who = m->get_orig_source_inst();
le.stamp = m->get_recv_stamp();
le.seq = 0;
le.type = CLOG_INFO;
- le.msg = ds.str();
+ le.msg = str_join(logtext, " ");
pending_summary.add(le);
pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, string(), get_last_committed()));
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index cf10f911258..61e2a2aa57a 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2055,11 +2055,8 @@ void Monitor::handle_command(MMonCommand *m)
cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
if (!injected_args.empty()) {
dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
- ostringstream argss;
- std::copy(injected_args.begin(), injected_args.end(),
- ostream_iterator<string>(argss, " "));
ostringstream oss;
- r = g_conf->injectargs(argss.str().c_str(), &oss);
+ r = g_conf->injectargs(str_join(injected_args, " "), &oss);
ss << "injectargs:" << oss.str();
rs = ss.str();
goto out;
@@ -2131,10 +2128,7 @@ void Monitor::handle_command(MMonCommand *m)
vector<string> tagsvec;
cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
- stringstream tags;
- std::copy(tagsvec.begin(), tagsvec.end(),
- ostream_iterator<string>(tags, " "));
- string tagstr = tags.str();
+ string tagstr = str_join(tagsvec, " ");
if (!tagstr.empty())
tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
f->dump_string("tag", tagstr);
diff --git a/src/pybind/rados.py b/src/pybind/rados.py
index e543ff79305..7768f8c39d3 100644
--- a/src/pybind/rados.py
+++ b/src/pybind/rados.py
@@ -182,19 +182,27 @@ class Rados(object):
raise RadosStateError("You cannot perform that operation on a \
Rados object in state %s." % (self.state))
- def __init__(self, rados_id=None, name='client.admin', clustername='ceph',
+ def __init__(self, rados_id=None, name=None, clustername=None,
conf_defaults=None, conffile=None, conf=None, flags=0):
self.librados = CDLL('librados.so.2')
self.cluster = c_void_p()
self.rados_id = rados_id
- if rados_id and not isinstance(rados_id, str):
+ if rados_id is not None and not isinstance(rados_id, str):
raise TypeError('rados_id must be a string or None')
if conffile is not None and not isinstance(conffile, str):
raise TypeError('conffile must be a string or None')
+ if name is not None and not isinstance(name, str):
+ raise TypeError('name must be a string or None')
+ if clustername is not None and not isinstance(clustername, str):
+ raise TypeError('clustername must be a string or None')
if rados_id and name:
raise Error("Rados(): can't supply both rados_id and name")
if rados_id:
name = 'client.' + rados_id
+ if name is None:
+ name = 'client.admin'
+ if clustername is None:
+ clustername = 'ceph'
ret = run_in_thread(self.librados.rados_create2,
(byref(self.cluster), c_char_p(clustername),
c_char_p(name), c_uint64(flags)))
diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py
index 5176d6383d5..4628a44a652 100644
--- a/src/test/pybind/test_rados.py
+++ b/src/test/pybind/test_rados.py
@@ -1,10 +1,41 @@
from nose.tools import eq_ as eq, assert_raises
-from rados import (Rados, Object, ObjectExists, ObjectNotFound,
+from rados import (Rados, Error, Object, ObjectExists, ObjectNotFound,
ANONYMOUS_AUID, ADMIN_AUID)
import threading
import json
import errno
+def test_rados_init_error():
+ assert_raises(Error, Rados, conffile='', rados_id='admin',
+ name='client.admin')
+ assert_raises(Error, Rados, conffile='', name='invalid')
+ assert_raises(Error, Rados, conffile='', name='bad.invalid')
+
+def test_rados_init_type_error():
+ assert_raises(TypeError, Rados, rados_id=u'admin')
+ assert_raises(TypeError, Rados, rados_id=u'')
+ assert_raises(TypeError, Rados, name=u'client.admin')
+ assert_raises(TypeError, Rados, name=u'')
+ assert_raises(TypeError, Rados, conffile=u'blah')
+ assert_raises(TypeError, Rados, conffile=u'')
+ assert_raises(TypeError, Rados, clusternaem=u'blah')
+ assert_raises(TypeError, Rados, clustername=u'')
+
+def test_rados_init():
+ with Rados(conffile='', rados_id='admin'):
+ pass
+ with Rados(conffile='', name='client.admin'):
+ pass
+ with Rados(conffile='', name='client.admin'):
+ pass
+ with Rados(conffile='', name='client.admin'):
+ pass
+
+def test_ioctx_context_manager():
+ with Rados(conffile='', rados_id='admin') as conn:
+ with conn.open_ioctx('data') as ioctx:
+ pass
+
class TestRados(object):
def setUp(self):
diff --git a/src/tools/ceph-monstore-tool.cc b/src/tools/ceph-monstore-tool.cc
index f361266aff0..4ab8fa86465 100644
--- a/src/tools/ceph-monstore-tool.cc
+++ b/src/tools/ceph-monstore-tool.cc
@@ -33,6 +33,7 @@
#include "mon/MonitorDBStore.h"
#include "mon/Paxos.h"
#include "common/Formatter.h"
+#include "include/stringify.h"
namespace po = boost::program_options;
using namespace std;
@@ -180,13 +181,20 @@ int main(int argc, char **argv) {
if (vm.count("out")) {
if ((fd = open(out_path.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666)) == -1) {
int _err = errno;
- std::cerr << "Couldn't open " << out_path << cpp_strerror(_err) << std::endl;
- return 1;
+ if (_err != EISDIR) {
+ std::cerr << "Couldn't open " << out_path << ": " << cpp_strerror(_err) << std::endl;
+ return 1;
+ }
}
} else {
fd = STDOUT_FILENO;
}
+ if (fd < 0 && cmd != "store-copy") {
+ std::cerr << "error: '" << out_path << "' is a directory!" << std::endl;
+ return 1;
+ }
+
MonitorDBStore st(store_path);
if (store_path.size()) {
stringstream ss;
@@ -331,13 +339,79 @@ int main(int argc, char **argv) {
t.compact_prefix(prefix);
st.apply_transaction(t);
}
+ } else if (cmd == "store-copy") {
+ if (!store_path.size()) {
+ std::cerr << "need mon store path to copy from" << std::endl;
+ std::cerr << desc << std::endl;
+ goto done;
+ }
+ if (!out_path.size()) {
+ std::cerr << "need mon store path to copy to (--out <mon_data_dir>)"
+ << std::endl;
+ std::cerr << desc << std::endl;
+ goto done;
+ }
+ if (fd > 0) {
+ std::cerr << "supplied out path '" << out_path << "' is not a directory"
+ << std::endl;
+ goto done;
+ }
+
+ MonitorDBStore out_store(out_path);
+ {
+ stringstream ss;
+ int r = out_store.create_and_open(ss);
+ if (r < 0) {
+ std::cerr << ss.str() << std::endl;
+ goto done;
+ }
+ }
+
+
+ KeyValueDB::WholeSpaceIterator it = st.get_iterator();
+ uint64_t total_keys = 0;
+ uint64_t total_size = 0;
+ uint64_t total_tx = 0;
+
+ do {
+ uint64_t num_keys = 0;
+
+ MonitorDBStore::Transaction tx;
+
+ while (it->valid() && num_keys < 128) {
+ pair<string,string> k = it->raw_key();
+ bufferlist v = it->value();
+ tx.put(k.first, k.second, v);
+
+ num_keys ++;
+ total_tx ++;
+ total_size += v.length();
+
+ it->next();
+ }
+
+ total_keys += num_keys;
+
+ if (!tx.empty())
+ out_store.apply_transaction(tx);
+
+ std::cout << "copied " << total_keys << " keys so far ("
+ << stringify(si_t(total_size)) << ")" << std::endl;
+
+ } while (it->valid());
+
+ std::cout << "summary: copied " << total_keys << " keys, using "
+ << total_tx << " transactions, totalling "
+ << stringify(si_t(total_size)) << std::endl;
+ std::cout << "from '" << store_path << "' to '" << out_path << "'"
+ << std::endl;
} else {
std::cerr << "Unrecognized command: " << cmd << std::endl;
goto done;
}
done:
- if (vm.count("out")) {
+ if (vm.count("out") && fd > 0) {
::close(fd);
}
return 0;