diff options
29 files changed, 2 insertions, 6141 deletions
@@ -6,10 +6,6 @@ Files: doc/* Copyright: (c) 2010-2012 New Dream Network and contributors License: Creative Commons Attribution-ShareAlike (CC BY-SA) -Files: src/client/hadoop/ceph -Copyright: Copyright (C) New Dream Network and contributors -License: Apache License v2 - Files: src/mount/canonicalize.c Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com> License: LGPL2 or later diff --git a/ceph.spec.in b/ceph.spec.in index 8091018c1dc..851ee7acfd5 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -278,7 +278,6 @@ export RPM_OPT_FLAGS=`echo $RPM_OPT_FLAGS | sed -e 's/i386/i486/'` --localstatedir=/var \ --sysconfdir=/etc \ --docdir=%{_docdir}/ceph \ - --without-hadoop \ --with-nss \ --without-cryptopp \ --with-rest-bench \ diff --git a/configure.ac b/configure.ac index d2bf5959881..d7f96fd11f4 100644 --- a/configure.ac +++ b/configure.ac @@ -391,25 +391,6 @@ if test "x$enable_cephfs_java" = "xyes"; then fi AM_CONDITIONAL(HAVE_JUNIT4, [test "$have_junit4" = "1"]) -# jni? -# clear cache (from java above) -- this whole thing will get -# folded into the bigger java package later -- for now maintain -# backward compat -AS_UNSET(ac_cv_header_jni_h) -AC_ARG_WITH([hadoop], - [AS_HELP_STRING([--with-hadoop], [build hadoop client])], - [], - [with_hadoop=check]) -AS_IF([test "x$with_hadoop" != xno], - [AC_CHECK_HEADER([jni.h], - [HAVE_JNI=1], - [if test "x$with_hadoop" != xcheck; then - AC_MSG_FAILURE( - [--with-hadoop was given but jni.h not found]) - fi - ])]) -AM_CONDITIONAL(WITH_HADOOPCLIENT, [test "$HAVE_JNI" = "1"]) - # # FreeBSD has it in base. # diff --git a/debian/copyright b/debian/copyright index aa91a149853..d11a0f7f5da 100644 --- a/debian/copyright +++ b/debian/copyright @@ -7,10 +7,6 @@ Files: * Copyright: (c) 2004-2010 by Sage Weil <sage@newdream.net> License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1) -Files: src/client/hadoop/ceph -Copyright: Copyright (C) New Dream Network and contributors -License: Apache License v2 - Files: src/mount/canonicalize.c Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com> License: LGPL2 or later diff --git a/do_autogen.sh b/do_autogen.sh index 32e9df4623b..bc6749e9e5d 100755 --- a/do_autogen.sh +++ b/do_autogen.sh @@ -10,7 +10,6 @@ do_autogen.sh: make a ceph build by running autogen, etc. level 1: -g level 3: -Wextra level 4: even more... --H --with-hadoop -T --without-tcmalloc -e <path> dump encoded objects to <path> -P profiling build @@ -46,8 +45,6 @@ do h) usage exit 0;; - H) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --with-hadoop";; - T) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --without-tcmalloc";; j) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --enable-cephfs-java";; diff --git a/doc/rados/troubleshooting/log-and-debug.rst b/doc/rados/troubleshooting/log-and-debug.rst index 2f2e5e4abc0..7d1ea43d1db 100644 --- a/doc/rados/troubleshooting/log-and-debug.rst +++ b/doc/rados/troubleshooting/log-and-debug.rst @@ -243,7 +243,7 @@ to their default level or to a level suitable for normal operations. +--------------------+-----------+--------------+ | ``rgw`` | 1 | 5 | +--------------------+-----------+--------------+ -| ``hadoop`` | 1 | 5 | +| ``javaclient`` | 1 | 5 | +--------------------+-----------+--------------+ | ``asok`` | 1 | 5 | +--------------------+-----------+--------------+ diff --git a/src/Makefile.am b/src/Makefile.am index 3520ca8d8f9..c0ed016006d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -96,25 +96,13 @@ bin_PROGRAMS += rbd-fuse endif # WITH_FUSE -# libcephfs (this and libhadoopcephfs should go somewhere else in the future) +# libcephfs (this should go somewhere else in the future) libcephfs_la_SOURCES = libcephfs.cc libcephfs_la_LIBADD = $(LIBCLIENT) $(LIBCOMMON) $(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS) libcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '^ceph_.*' lib_LTLIBRARIES += libcephfs.la -# order matters! this *must* come after libcephfs, or else you'll encounter something -# like "error: relink libhadoopcephfs.la with the above command before installing it" -if WITH_HADOOPCLIENT -JAVA_BASE = /usr/lib/jvm/java-6-sun -libhadoopcephfs_la_SOURCES = client/hadoop/CephFSInterface.cc -libhadoopcephfs_la_LIBADD = $(LIBCEPHFS) -libhadoopcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex 'hadoopcephfs_.*' -lib_LTLIBRARIES += libhadoopcephfs.la -noinst_HEADERS += client/hadoop/CephFSInterface.h -endif # WITH_HADOOPCLIENT - - # jni library (java source is in src/java) if ENABLE_CEPHFS_JAVA diff --git a/src/client/hadoop/CephFSInterface.cc b/src/client/hadoop/CephFSInterface.cc deleted file mode 100644 index d5a3c8f4fcd..00000000000 --- a/src/client/hadoop/CephFSInterface.cc +++ /dev/null @@ -1,993 +0,0 @@ -// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -#include "CephFSInterface.h" -#include "include/cephfs/libcephfs.h" -#include "common/ceph_argparse.h" -#include "common/config.h" -#include "msg/SimpleMessenger.h" - -#include <arpa/inet.h> -#include <sys/stat.h> -#include <sys/statvfs.h> - -#define dout_subsys ceph_subsys_hadoop - -union ceph_mount_union_t { - struct ceph_mount_info *cmount; - jlong cjlong; -}; - -static void set_ceph_mount_info(JNIEnv *env, jobject obj, struct ceph_mount_info *cmount) -{ - jclass cls = env->GetObjectClass(obj); - if (cls == NULL) - return; - jfieldID fid = env->GetFieldID(cls, "cluster", "J"); - if (fid == NULL) - return; - ceph_mount_union_t ceph_mount_union; - ceph_mount_union.cjlong = 0; - ceph_mount_union.cmount = cmount; - env->SetLongField(obj, fid, ceph_mount_union.cjlong); -} - -static struct ceph_mount_info *get_ceph_mount_t(JNIEnv *env, jobject obj) -{ - jclass cls = env->GetObjectClass(obj); - jfieldID fid = env->GetFieldID(cls, "cluster", "J"); - if (fid == NULL) - return NULL; - ceph_mount_union_t ceph_mount_union; - ceph_mount_union.cjlong = env->GetLongField(obj, fid); - return ceph_mount_union.cmount; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_initializeClient - * Signature: (Ljava/lang/String;I)Z - * - * Performs any necessary setup to allow general use of the filesystem. - * Inputs: - * jstring args -- a command-line style input of Ceph config params - * jint block_size -- the size in bytes to use for blocks - * Returns: true on success, false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient - (JNIEnv *env, jobject obj, jstring j_args, jint block_size) -{ - // Convert Java argument string to argv - const char *c_args = env->GetStringUTFChars(j_args, 0); - if (c_args == NULL) - return false; //out of memory! - string cppargs(c_args); - char b[cppargs.length()+1]; - strcpy(b, cppargs.c_str()); - env->ReleaseStringUTFChars(j_args, c_args); - std::vector<const char*> args; - char *p = b; - while (*p) { - args.push_back(p); - while (*p && *p != ' ') - p++; - if (!*p) - break; - *p++ = 0; - while (*p && *p == ' ') - p++; - } - - // parse the arguments - bool set_local_writes = false; - std::string mount_root, val; - for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) { - if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) { - mount_root = val; - } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) { - set_local_writes = true; - } else { - ++i; - } - } - - // connect to the cmount - struct ceph_mount_info *cmount; - int ret = ceph_create(&cmount, NULL); - if (ret) - return false; - ceph_conf_read_file(cmount, NULL); // read config file from the default location - ceph_conf_parse_argv(cmount, args.size(), &args[0]); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 3) << "CephFSInterface: mounting filesystem...:" << dendl; - - ret = ceph_mount(cmount, mount_root.c_str()); - if (ret) - return false; - - ceph_localize_reads(cmount, true); - ceph_set_default_file_stripe_unit(cmount, block_size); - ceph_set_default_object_size(cmount, block_size); - - if (set_local_writes) { - ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount)); - } - - set_ceph_mount_info(env, obj, cmount); - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getcwd - * Signature: (J)Ljava/lang/String; - * - * Returns the current working directory.(absolute) as a jstring - */ -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd - (JNIEnv *env, jobject obj) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In getcwd" << dendl; - jstring j_path = env->NewStringUTF(ceph_getcwd(cmount)); - return j_path; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setcwd - * Signature: (Ljava/lang/String;)Z - * - * Changes the working directory. - * Inputs: - * jstring j_path: The path (relative or absolute) to switch to - * Returns: true on success, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In setcwd" << dendl; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int ret = ceph_chdir(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rmdir - * Signature: (Ljava/lang/String;)Z - * - * Given a path to a directory, removes the directory.if empty. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory - * Returns: true on successful delete; false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In rmdir" << dendl; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL) - return false; - int ret = ceph_rmdir(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_unlink - * Signature: (Ljava/lang/String;)Z - * Given a path, unlinks it. - * Inputs: - * jstring j_path: The path (relative or absolute) to the file or empty dir - * Returns: true if the unlink occurred, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - ldout(cct, 10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl; - int ret = ceph_unlink(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rename - * Signature: (Ljava/lang/String;Ljava/lang/String;)Z - * Changes a given path name to a new name. - * Inputs: - * jstring j_from: The path whose name you want to change. - * jstring j_to: The new name for the path. - * Returns: true if the rename occurred, false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename - (JNIEnv *env, jobject obj, jstring j_from, jstring j_to) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In rename" << dendl; - const char *c_from = env->GetStringUTFChars(j_from, 0); - if (c_from == NULL) - return false; - const char *c_to = env->GetStringUTFChars(j_to, 0); - if (c_to == NULL) { - env->ReleaseStringUTFChars(j_from, c_from); - return false; - } - struct stat stbuf; - int ret = ceph_lstat(cmount, c_to, &stbuf); - if (ret != -ENOENT) { - // Hadoop doesn't want to overwrite files in a rename. - env->ReleaseStringUTFChars(j_from, c_from); - env->ReleaseStringUTFChars(j_to, c_to); - return JNI_FALSE; - } - - ret = ceph_rename(cmount, c_from, c_to); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_exists - * Signature: (Ljava/lang/String;)Z - * Returns true if it the input path exists, false - * if it does not or there is an unexpected failure. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In exists" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - ldout(cct, 10) << "Attempting lstat with file " << c_path << ":" << dendl; - - int ret = ceph_lstat(cmount, c_path, &stbuf); - ldout(cct, 10) << "result is " << ret << dendl; - env->ReleaseStringUTFChars(j_path, c_path); - if (ret < 0) { - ldout(cct, 10) << "Returning false (file does not exist)" << dendl; - return JNI_FALSE; - } - else { - ldout(cct, 10) << "Returning true (file exists)" << dendl; - return JNI_TRUE; - } -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getblocksize - * Signature: (Ljava/lang/String;)J - * Get the block size for a given path. - * Input: - * j_string j_path: The path (relative or absolute) you want - * the block size for. - * Returns: block size (as a long) if the path exists, otherwise a negative - * number corresponding to the standard C++ error codes (which are positive). - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In getblocksize" << dendl; - - //struct stat stbuf; - - jlong result; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - // we need to open the file to retrieve the stripe size - ldout(cct, 10) << "CephFSInterface: getblocksize: opening file" << dendl; - int fh = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - if (fh < 0) - return fh; - - result = ceph_get_file_stripe_unit(cmount, fh); - - int close_result = ceph_close(cmount, fh); - if (close_result < 0) - return close_result; - - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isfile - * Signature: (Ljava/lang/String;)Z - * Returns true if the given path is a file; false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In isfile" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int ret = ceph_lstat(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - - // if the stat call failed, it's definitely not a file... - if (ret < 0) - return false; - - // check the stat result - return !!S_ISREG(stbuf.st_mode); -} - - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isdirectory - * Signature: (Ljava/lang/String;)Z - * Returns true if the given path is a directory, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In isdirectory" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int result = ceph_lstat(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - - // if the stat call failed, it's definitely not a directory... - if (result < 0) - return JNI_FALSE; - - // check the stat result - return !!S_ISDIR(stbuf.st_mode); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getdir - * Signature: (Ljava/lang/String;)[Ljava/lang/String; - * Get the contents of a given directory. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory. - * Returns: A Java String[] of the contents of the directory, or - * NULL if there is an error (ie, path is not a dir). This listing - * will not contain . or .. entries. - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In getdir" << dendl; - - // get the directory listing - list<string> contents; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return NULL; - struct ceph_dir_result *dirp; - int r; - r = ceph_opendir(cmount, c_path, &dirp); - if (r<0) { - env->ReleaseStringUTFChars(j_path, c_path); - return NULL; - } - int buflen = 100; //good default? - char *buf = new char[buflen]; - string *ent; - int bufpos; - while (1) { - r = ceph_getdnames(cmount, dirp, buf, buflen); - if (r==-ERANGE) { //expand the buffer - delete [] buf; - buflen *= 2; - buf = new char[buflen]; - continue; - } - if (r<=0) break; - - //if we make it here, we got at least one name - bufpos = 0; - while (bufpos<r) {//make new strings and add them to listing - ent = new string(buf+bufpos); - if (ent->compare(".") && ent->compare("..")) - //we DON'T want to include dot listings; Hadoop gets confused - contents.push_back(*ent); - bufpos+=ent->size()+1; - delete ent; - } - } - delete [] buf; - ceph_closedir(cmount, dirp); - env->ReleaseStringUTFChars(j_path, c_path); - - if (r < 0) return NULL; - - // Create a Java String array of the size of the directory listing - jclass stringClass = env->FindClass("java/lang/String"); - if (stringClass == NULL) { - ldout(cct, 0) << "ERROR: java String class not found; dying a horrible, painful death" << dendl; - assert(0); - } - jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(contents.size(), stringClass, NULL); - if(dirListingStringArray == NULL) return NULL; - - // populate the array with the elements of the directory list - int i = 0; - for (list<string>::iterator it = contents.begin(); - it != contents.end(); - ++it) { - env->SetObjectArrayElement(dirListingStringArray, i, - env->NewStringUTF(it->c_str())); - ++i; - } - - return dirListingStringArray; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_mkdirs - * Signature: (Ljava/lang/String;I)I - * Create the specified directory and any required intermediate ones with the - * given mode. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs -(JNIEnv *env, jobject obj, jstring j_path, jint mode) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In Hadoop mk_dirs" << dendl; - - //get c-style string and make the call, clean up the string... - jint result; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_mkdirs(cmount, c_path, mode); - env->ReleaseStringUTFChars(j_path, c_path); - - //...and return - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_append - * Signature: (Ljava/lang/String;)I - * Open a file to append. If the file does not exist, it will be created. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In hadoop open_for_append" << dendl; - - jint result; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0); - env->ReleaseStringUTFChars(j_path, c_path); - - return result; -} - - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_read - * Signature: (Ljava/lang/String;)I - * Open a file for reading. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In open_for_read" << dendl; - - jint result; - - // open as read-only: flag = O_RDONLY - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - - // returns file handle, or -1 on failure - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_overwrite - * Signature: (Ljava/lang/String;)I - * Opens a file for overwriting; creates it if necessary. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * jint mode: The mode to open with. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite - (JNIEnv *env, jobject obj, jstring j_path, jint mode) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In open_for_overwrite" << dendl; - - jint result; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode); - env->ReleaseStringUTFChars(j_path, c_path); - - // returns file handle, or -1 on failure - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_close - * Signature: (I)I - * Closes a given filehandle. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close -(JNIEnv *env, jobject obj, jint fh) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::ceph_close" << dendl; - return ceph_close(cmount, fh); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setPermission - * Signature: (Ljava/lang/String;I)Z - * Change the mode on a path. - * Inputs: - * jstring j_path: The path to change mode on. - * jint j_new_mode: The mode to apply. - * Returns: true if the mode is properly applied, false if there - * is any error. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission -(JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode) -{ - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int result = ceph_chmod(cmount, c_path, j_new_mode); - env->ReleaseStringUTFChars(j_path, c_path); - - return (result==0); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_kill_client - * Signature: (J)Z - * - * Closes the Ceph client. This should be called before shutting down - * (multiple times is okay but redundant). - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client - (JNIEnv *env, jobject obj) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - if (!cmount) - return true; - ceph_shutdown(cmount); - set_ceph_mount_info(env, obj, NULL); - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_stat - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z - * Get the statistics on a path returned in a custom format defined - * in CephTalker. - * Inputs: - * jstring j_path: The path to stat. - * jobject j_stat: The stat object to fill. - * Returns: true if the stat is successful, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat -(JNIEnv *env, jobject obj, jstring j_path, jobject j_stat) -{ - //setup variables - struct stat st; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; - - jclass cls = env->GetObjectClass(j_stat); - if (cls == NULL) return false; - jfieldID c_size_id = env->GetFieldID(cls, "size", "J"); - if (c_size_id == NULL) return false; - jfieldID c_dir_id = env->GetFieldID(cls, "is_dir", "Z"); - if (c_dir_id == NULL) return false; - jfieldID c_block_id = env->GetFieldID(cls, "block_size", "J"); - if (c_block_id == NULL) return false; - jfieldID c_mod_id = env->GetFieldID(cls, "mod_time", "J"); - if (c_mod_id == NULL) return false; - jfieldID c_access_id = env->GetFieldID(cls, "access_time", "J"); - if (c_access_id == NULL) return false; - jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I"); - if (c_mode_id == NULL) return false; - //do actual lstat - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int r = ceph_lstat(cmount, c_path, &st); - env->ReleaseStringUTFChars(j_path, c_path); - - if (r < 0) return false; //fail out; file DNE or Ceph broke - - //put variables from struct stat into Java - env->SetLongField(j_stat, c_size_id, st.st_size); - env->SetBooleanField(j_stat, c_dir_id, (0 != S_ISDIR(st.st_mode))); - env->SetLongField(j_stat, c_block_id, st.st_blksize); - - long long java_mtime(st.st_mtim.tv_sec); - java_mtime *= 1000; - java_mtime += st.st_mtim.tv_nsec / 1000; - env->SetLongField(j_stat, c_mod_id, java_mtime); - - long long java_atime(st.st_atim.tv_sec); - java_atime *= 1000; - java_atime += st.st_atim.tv_nsec / 1000; - env->SetLongField(j_stat, c_access_id, java_atime); - - env->SetIntField(j_stat, c_mode_id, (int)st.st_mode); - - //return happy - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_statfs - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I - * Statfs a filesystem in a custom format defined in CephTalker. - * Inputs: - * jstring j_path: A path on the filesystem that you wish to stat. - * jobject j_ceph_stat: The CephStat object to fill. - * Returns: true if successful and the CephStat is filled; false otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs -(JNIEnv *env, jobject obj, jstring j_path, jobject j_cephstat) -{ - //setup variables - struct statvfs stbuf; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - jclass cls = env->GetObjectClass(j_cephstat); - if (cls == NULL) - return 1; //JVM error of some kind - jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J"); - jfieldID c_used_id = env->GetFieldID(cls, "used", "J"); - jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J"); - - //do the statfs - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int r = ceph_statfs(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - if (r != 0) - return r; //something broke - - //place info into Java; convert from bytes to kilobytes - env->SetLongField(j_cephstat, c_capacity_id, - (long)stbuf.f_blocks*stbuf.f_bsize/1024); - env->SetLongField(j_cephstat, c_used_id, - (long)(stbuf.f_blocks-stbuf.f_bavail)*stbuf.f_bsize/1024); - env->SetLongField(j_cephstat, c_remaining_id, - (long)stbuf.f_bavail*stbuf.f_bsize/1024); - return r; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_replication - * Signature: (Ljava/lang/String;)I - * Check how many times a path should be replicated (if it is - * degraded it may not actually be replicated this often). - * Inputs: - * jstring j_path: The path to check. - * Returns: an int containing the number of times replicated. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication -(JNIEnv *env, jobject obj, jstring j_path) -{ - //get c-string of path, send off to libceph, release c-string, return - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int fh = 0; - fh = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - if (fh < 0) { - return fh; - } - int replication = ceph_get_file_replication(cmount, fh); - ceph_close(cmount, fh); - return replication; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_hosts - * Signature: (IJ)[Ljava/lang/String; - * Find the IP:port addresses of the primary OSD for a given file and offset. - * Inputs: - * jint j_fh: The filehandle for the file. - * jlong j_offset: The offset to get the location of. - * Returns: a jstring of the location as IP, or NULL if there is an error. - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts -(JNIEnv *env, jobject obj, jint j_fh, jlong j_offset) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - struct sockaddr_storage *ss; - char address[30]; - jobjectArray addr_array; - jclass string_cls; - jstring j_addr; - int r, n = 3; /* initial guess at # of replicas */ - - for (;;) { - ss = new struct sockaddr_storage[n]; - r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, ss, n); - if (r < 0) { - if (r == -ERANGE) { - delete [] ss; - n *= 2; - continue; - } - return NULL; - } - n = r; - break; - } - - /* TODO: cache this */ - string_cls = env->FindClass("java/lang/String"); - if (!string_cls) - goto out; - - addr_array = env->NewObjectArray(n, string_cls, NULL); - if (!addr_array) - goto out; - - for (r = 0; r < n; r++) { - /* Hadoop only deals with IPv4 */ - if (ss[r].ss_family != AF_INET) - goto out; - - memset(address, 0, sizeof(address)); - - inet_ntop(ss[r].ss_family, &((struct sockaddr_in *)&ss[r])->sin_addr, - address, sizeof(address)); - - j_addr = env->NewStringUTF(address); - - env->SetObjectArrayElement(addr_array, r, j_addr); - if (env->ExceptionOccurred()) - goto out; - - env->DeleteLocalRef(j_addr); - } - - delete [] ss; - return addr_array; - -out: - delete [] ss; - return NULL; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setTimes - * Signature: (Ljava/lang/String;JJ)I - * Set the mtime and atime for a given path. - * Inputs: - * jstring j_path: The path to set the times for. - * jlong mtime: The mtime to set, in millis since epoch (-1 to not set). - * jlong atime: The atime to set, in millis since epoch (-1 to not set) - * Returns: 0 if successful, an error code otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes -(JNIEnv *env, jobject obj, jstring j_path, jlong mtime, jlong atime) -{ - const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL) return -ENOMEM; - - //build the mask for ceph_setattr - int mask = 0; - if (mtime!=-1) mask = CEPH_SETATTR_MTIME; - if (atime!=-1) mask |= CEPH_SETATTR_ATIME; - //build a struct stat and fill it in! - //remember to convert from millis to seconds and microseconds - struct stat attr; - attr.st_mtim.tv_sec = mtime / 1000; - attr.st_mtim.tv_nsec = (mtime % 1000) * 1000000; - attr.st_atim.tv_sec = atime / 1000; - attr.st_atim.tv_nsec = (atime % 1000) * 1000000; - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - return ceph_setattr(cmount, c_path, &attr, mask); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_read - * Signature: (JI[BII)I - * Reads into the given byte array from the current position. - * Inputs: - * jint fh: the filehandle to read from - * jbyteArray j_buffer: the byte array to read into - * jint buffer_offset: where in the buffer to start writing - * jint length: how much to read. - * There'd better be enough space in the buffer to write all - * the data from the given offset! - * Returns: the number of bytes read on success (as jint), - * or an error code otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read - (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In read" << dendl; - - - // Make sure to convert the Hadoop read arguments into a - // more ceph-friendly form - jint result; - - // Step 1: get a pointer to the buffer. - jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); - if (j_buffer_ptr == NULL) return -ENOMEM; - char *c_buffer = (char*) j_buffer_ptr; - - // Step 2: pointer arithmetic to start in the right buffer position - c_buffer += (int)buffer_offset; - - // Step 3: do the read - result = ceph_read(cmount, (int)fh, c_buffer, length, -1); - - // Step 4: release the pointer to the buffer - env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); - - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_seek_from_start - * Signature: (JIJ)J - * Seeks to the given position in the given file. - * Inputs: - * jint fh: The filehandle to seek in. - * jlong pos: The position to seek to. - * Returns: the new position (as a jlong) of the filehandle on success, - * or a negative error code on failure. - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start - (JNIEnv *env, jobject obj, jint fh, jlong pos) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::seek_from_start" << dendl; - return ceph_lseek(cmount, fh, pos, SEEK_SET); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getpos - * Signature: (I)J - * - * Get the current position in a file (as a jlong) of a given filehandle. - * Returns: jlong current file position on success, or a - * negative error code on failure. - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos - (JNIEnv *env, jobject obj, jint fh) -{ - // seek a distance of 0 to get current offset - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::ceph_getpos" << dendl; - return ceph_lseek(cmount, fh, 0, SEEK_CUR); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_write - * Signature: (I[BII)I - * Write the given buffer contents to the given filehandle. - * Inputs: - * jint fh: The filehandle to write to. - * jbyteArray j_buffer: The buffer to write from - * jint buffer_offset: The position in the buffer to write from - * jint length: The number of (sequential) bytes to write. - * Returns: jint, on success the number of bytes written, on failure - * a negative error code. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write - (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In write" << dendl; - - // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we - // have to convert. The write is *always* from the current position in the file, - // and buffer_offset is the location in the *buffer* where we start writing. - jint result; - - // Step 1: get a pointer to the buffer. - jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); - if (j_buffer_ptr == NULL) - return -ENOMEM; - char *c_buffer = (char*) j_buffer_ptr; - - // Step 2: pointer arithmetic to start in the right buffer position - c_buffer += (int)buffer_offset; - - // Step 3: do the write - result = ceph_write(cmount, (int)fh, c_buffer, length, -1); - - // Step 4: release the pointer to the buffer - env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); - - return result; -} diff --git a/src/client/hadoop/CephFSInterface.h b/src/client/hadoop/CephFSInterface.h deleted file mode 100644 index 6939b3a501d..00000000000 --- a/src/client/hadoop/CephFSInterface.h +++ /dev/null @@ -1,236 +0,0 @@ -// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* BE CAREFUL EDITING THIS FILE - it is a compilation of JNI - machine-generated headers */ - -#include <jni.h> - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker -#define _Included_org_apache_hadoop_fs_ceph_CephTalker -#ifdef __cplusplus -extern "C" { -#endif - //these constants are machine-generated to match Java constants in the source -#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE -#define org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE 8388608LL -#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE -#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_initializeClient - * Signature: (Ljava/lang/String;I)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getcwd - * Signature: ()Ljava/lang/String; - */ -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd - (JNIEnv *, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setcwd - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rmdir - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_unlink - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rename - * Signature: (Ljava/lang/String;Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename - (JNIEnv *, jobject, jstring, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_exists - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getblocksize - * Signature: (Ljava/lang/String;)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isdirectory - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isfile - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getdir - * Signature: (Ljava/lang/String;)[Ljava/lang/String; - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_mkdirs - * Signature: (Ljava/lang/String;I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_append - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_read - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_overwrite - * Signature: (Ljava/lang/String;I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_close - * Signature: (I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setPermission - * Signature: (Ljava/lang/String;I)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_kill_client - * Signature: ()Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client - (JNIEnv *, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_stat - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat - (JNIEnv *, jobject, jstring, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_statfs - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs -(JNIEnv * env, jobject obj, jstring j_path, jobject j_cephstat); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_replication - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_hosts - * Signature: (IJ)[Ljava/lang/String; - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts - (JNIEnv *, jobject, jint, jlong); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setTimes - * Signature: (Ljava/lang/String;JJ)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes - (JNIEnv *, jobject, jstring, jlong, jlong); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_read - * Signature: (I[BII)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read - (JNIEnv *, jobject, jint, jbyteArray, jint, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_seek_from_start - * Signature: (IJ)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start - (JNIEnv *, jobject, jint, jlong); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getpos - * Signature: (I)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_write - * Signature: (I[BII)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write - (JNIEnv *, jobject, jint, jbyteArray, jint, jint); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/HADOOP-ceph.patch b/src/client/hadoop/HADOOP-ceph.patch deleted file mode 100644 index 84cdb370f77..00000000000 --- a/src/client/hadoop/HADOOP-ceph.patch +++ /dev/null @@ -1,2234 +0,0 @@ -diff --git a/src/core/core-default.xml b/src/core/core-default.xml -index 8bc3b99..26543bc 100644 ---- a/src/core/core-default.xml -+++ b/src/core/core-default.xml -@@ -210,6 +210,12 @@ - </property> - - <property> -+ <name>fs.ceph.impl</name> -+ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value> -+ <description>The file system for ceph: uris.</description> -+</property> -+ -+<property> - <name>fs.har.impl.disable.cache</name> - <value>true</value> - <description>Don't cache 'har' filesystem instances.</description> -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java -new file mode 100644 -index 0000000..5d51eb2 ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java -@@ -0,0 +1,250 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * Abstract base class for communicating with a Ceph filesystem and its -+ * C++ codebase from Java, or pretending to do so (for unit testing purposes). -+ * As only the Ceph package should be using this directly, all methods -+ * are protected. -+ */ -+package org.apache.hadoop.fs.ceph; -+ -+import org.apache.hadoop.conf.Configuration; -+ -+abstract class CephFS { -+ -+ protected static final int ENOTDIR = 20; -+ protected static final int EEXIST = 17; -+ protected static final int ENOENT = 2; -+ -+ /* -+ * Performs any necessary setup to allow general use of the filesystem. -+ * Inputs: -+ * String argsuments -- a command-line style input of Ceph config params -+ * int block_size -- the size in bytes to use for blocks -+ * Returns: true on success, false otherwise -+ */ -+ abstract protected boolean ceph_initializeClient(String arguments, int block_size); -+ -+ /* -+ * Returns the current working directory (absolute) as a String -+ */ -+ abstract protected String ceph_getcwd(); -+ -+ /* -+ * Changes the working directory. -+ * Inputs: -+ * String path: The path (relative or absolute) to switch to -+ * Returns: true on success, false otherwise. -+ */ -+ abstract protected boolean ceph_setcwd(String path); -+ -+ /* -+ * Given a path to a directory, removes the directory if empty. -+ * Inputs: -+ * jstring j_path: The path (relative or absolute) to the directory -+ * Returns: true on successful delete; false otherwise -+ */ -+ abstract protected boolean ceph_rmdir(String path); -+ -+ /* -+ * Given a path, unlinks it. -+ * Inputs: -+ * String path: The path (relative or absolute) to the file or empty dir -+ * Returns: true if the unlink occurred, false otherwise. -+ */ -+ abstract protected boolean ceph_unlink(String path); -+ -+ /* -+ * Changes a given path name to a new name, assuming new_path doesn't exist. -+ * Inputs: -+ * jstring j_from: The path whose name you want to change. -+ * jstring j_to: The new name for the path. -+ * Returns: true if the rename occurred, false otherwise -+ */ -+ abstract protected boolean ceph_rename(String old_path, String new_path); -+ -+ /* -+ * Returns true if it the input path exists, false -+ * if it does not or there is an unexpected failure. -+ */ -+ abstract protected boolean ceph_exists(String path); -+ -+ /* -+ * Get the block size for a given path. -+ * Input: -+ * String path: The path (relative or absolute) you want -+ * the block size for. -+ * Returns: block size if the path exists, otherwise a negative number -+ * corresponding to the standard C++ error codes (which are positive). -+ */ -+ abstract protected long ceph_getblocksize(String path); -+ -+ /* -+ * Returns true if the given path is a directory, false otherwise. -+ */ -+ abstract protected boolean ceph_isdirectory(String path); -+ -+ /* -+ * Returns true if the given path is a file; false otherwise. -+ */ -+ abstract protected boolean ceph_isfile(String path); -+ -+ /* -+ * Get the contents of a given directory. -+ * Inputs: -+ * String path: The path (relative or absolute) to the directory. -+ * Returns: A Java String[] of the contents of the directory, or -+ * NULL if there is an error (ie, path is not a dir). This listing -+ * will not contain . or .. entries. -+ */ -+ abstract protected String[] ceph_getdir(String path); -+ -+ /* -+ * Create the specified directory and any required intermediate ones with the -+ * given mode. -+ */ -+ abstract protected int ceph_mkdirs(String path, int mode); -+ -+ /* -+ * Open a file to append. If the file does not exist, it will be created. -+ * Opening a dir is possible but may have bad results. -+ * Inputs: -+ * String path: The path to open. -+ * Returns: an int filehandle, or a number<0 if an error occurs. -+ */ -+ abstract protected int ceph_open_for_append(String path); -+ -+ /* -+ * Open a file for reading. -+ * Opening a dir is possible but may have bad results. -+ * Inputs: -+ * String path: The path to open. -+ * Returns: an int filehandle, or a number<0 if an error occurs. -+ */ -+ abstract protected int ceph_open_for_read(String path); -+ -+ /* -+ * Opens a file for overwriting; creates it if necessary. -+ * Opening a dir is possible but may have bad results. -+ * Inputs: -+ * String path: The path to open. -+ * int mode: The mode to open with. -+ * Returns: an int filehandle, or a number<0 if an error occurs. -+ */ -+ abstract protected int ceph_open_for_overwrite(String path, int mode); -+ -+ /* -+ * Closes the given file. Returns 0 on success, or a negative -+ * error code otherwise. -+ */ -+ abstract protected int ceph_close(int filehandle); -+ -+ /* -+ * Change the mode on a path. -+ * Inputs: -+ * String path: The path to change mode on. -+ * int mode: The mode to apply. -+ * Returns: true if the mode is properly applied, false if there -+ * is any error. -+ */ -+ abstract protected boolean ceph_setPermission(String path, int mode); -+ -+ /* -+ * Closes the Ceph client. This should be called before shutting down -+ * (multiple times is okay but redundant). -+ */ -+ abstract protected boolean ceph_kill_client(); -+ -+ /* -+ * Get the statistics on a path returned in a custom format defined -+ * in CephFileSystem. -+ * Inputs: -+ * String path: The path to stat. -+ * Stat fill: The stat object to fill. -+ * Returns: true if the stat is successful, false otherwise. -+ */ -+ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill); -+ -+ /* -+ * Check how many times a file should be replicated. If it is, -+ * degraded it may not actually be replicated this often. -+ * Inputs: -+ * int fh: a file descriptor -+ * Returns: an int containing the number of times replicated. -+ */ -+ abstract protected int ceph_replication(String path); -+ -+ /* -+ * Find the IP address of the primary OSD for a given file and offset. -+ * Inputs: -+ * int fh: The filehandle for the file. -+ * long offset: The offset to get the location of. -+ * Returns: an array of String of the location as IP, or NULL if there is an error. -+ */ -+ abstract protected String[] ceph_hosts(int fh, long offset); -+ -+ /* -+ * Set the mtime and atime for a given path. -+ * Inputs: -+ * String path: The path to set the times for. -+ * long mtime: The mtime to set, in millis since epoch (-1 to not set). -+ * long atime: The atime to set, in millis since epoch (-1 to not set) -+ * Returns: 0 if successful, an error code otherwise. -+ */ -+ abstract protected int ceph_setTimes(String path, long mtime, long atime); -+ -+ /* -+ * Get the current position in a file (as a long) of a given filehandle. -+ * Returns: (long) current file position on success, or a -+ * negative error code on failure. -+ */ -+ abstract protected long ceph_getpos(int fh); -+ -+ /* -+ * Write the given buffer contents to the given filehandle. -+ * Inputs: -+ * int fh: The filehandle to write to. -+ * byte[] buffer: The buffer to write from -+ * int buffer_offset: The position in the buffer to write from -+ * int length: The number of (sequential) bytes to write. -+ * Returns: int, on success the number of bytes written, on failure -+ * a negative error code. -+ */ -+ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); -+ -+ /* -+ * Reads into the given byte array from the current position. -+ * Inputs: -+ * int fh: the filehandle to read from -+ * byte[] buffer: the byte array to read into -+ * int buffer_offset: where in the buffer to start writing -+ * int length: how much to read. -+ * There'd better be enough space in the buffer to write all -+ * the data from the given offset! -+ * Returns: the number of bytes read on success (as an int), -+ * or an error code otherwise. */ -+ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); -+ -+ /* -+ * Seeks to the given position in the given file. -+ * Inputs: -+ * int fh: The filehandle to seek in. -+ * long pos: The position to seek to. -+ * Returns: the new position (as a long) of the filehandle on success, -+ * or a negative error code on failure. */ -+ abstract protected long ceph_seek_from_start(int fh, long pos); -+} -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java -new file mode 100644 -index 0000000..c598f53 ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java -@@ -0,0 +1,483 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * This uses the local Filesystem but pretends to be communicating -+ * with a Ceph deployment, for unit testing the CephFileSystem. -+ */ -+ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import java.net.URI; -+import java.util.Hashtable; -+import java.io.Closeable; -+import java.io.FileNotFoundException; -+import java.io.IOException; -+ -+import org.apache.commons.logging.Log; -+import org.apache.commons.logging.LogFactory; -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.fs.BlockLocation; -+import org.apache.hadoop.fs.FileStatus; -+import org.apache.hadoop.fs.FileSystem; -+import org.apache.hadoop.fs.FSDataInputStream; -+import org.apache.hadoop.fs.FSDataOutputStream; -+import org.apache.hadoop.fs.Path; -+import org.apache.hadoop.fs.permission.FsPermission; -+ -+ -+class CephFaker extends CephFS { -+ private static final Log LOG = LogFactory.getLog(CephFaker.class); -+ FileSystem localFS; -+ String localPrefix; -+ int blockSize; -+ Configuration conf; -+ Hashtable<Integer, Object> files; -+ Hashtable<Integer, String> filenames; -+ int fileCount = 0; -+ boolean initialized = false; -+ -+ public CephFaker(Configuration con, Log log) { -+ conf = con; -+ files = new Hashtable<Integer, Object>(); -+ filenames = new Hashtable<Integer, String>(); -+ } -+ -+ protected boolean ceph_initializeClient(String args, int block_size) { -+ if (!initialized) { -+ // let's remember the default block_size -+ blockSize = block_size; -+ -+ /* for a real Ceph deployment, this starts up the client, -+ * sets debugging levels, etc. We just need to get the -+ * local FileSystem to use, and we'll ignore any -+ * command-line arguments. */ -+ try { -+ localFS = FileSystem.getLocal(conf); -+ localFS.initialize(URI.create("file://localhost"), conf); -+ localFS.setVerifyChecksum(false); -+ String testDir = conf.get("hadoop.tmp.dir"); -+ -+ localPrefix = localFS.getWorkingDirectory().toString(); -+ int testDirLoc = localPrefix.indexOf(testDir) - 1; -+ -+ if (-2 == testDirLoc) { -+ testDirLoc = localPrefix.length(); -+ } -+ localPrefix = localPrefix.substring(0, testDirLoc) + "/" -+ + conf.get("hadoop.tmp.dir"); -+ -+ localFS.setWorkingDirectory( -+ new Path(localPrefix + "/user/" + System.getProperty("user.name"))); -+ // I don't know why, but the unit tests expect the default -+ // working dir to be /user/username, so satisfy them! -+ // debug("localPrefix is " + localPrefix, INFO); -+ } catch (IOException e) { -+ return false; -+ } -+ initialized = true; -+ } -+ return true; -+ } -+ -+ protected String ceph_getcwd() { -+ return sanitize_path(localFS.getWorkingDirectory().toString()); -+ } -+ -+ protected boolean ceph_setcwd(String path) { -+ localFS.setWorkingDirectory(new Path(prepare_path(path))); -+ return true; -+ } -+ -+ // the caller is responsible for ensuring empty dirs -+ protected boolean ceph_rmdir(String pth) { -+ Path path = new Path(prepare_path(pth)); -+ boolean ret = false; -+ -+ try { -+ if (localFS.listStatus(path).length <= 1) { -+ ret = localFS.delete(path, true); -+ } -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ // this needs to work on (empty) directories too -+ protected boolean ceph_unlink(String path) { -+ path = prepare_path(path); -+ boolean ret = false; -+ -+ if (ceph_isdirectory(path)) { -+ ret = ceph_rmdir(path); -+ } else { -+ try { -+ ret = localFS.delete(new Path(path), false); -+ } catch (IOException e) {} -+ } -+ return ret; -+ } -+ -+ protected boolean ceph_rename(String oldName, String newName) { -+ oldName = prepare_path(oldName); -+ newName = prepare_path(newName); -+ try { -+ Path parent = new Path(newName).getParent(); -+ Path newPath = new Path(newName); -+ -+ if (localFS.exists(parent) && !localFS.exists(newPath)) { -+ return localFS.rename(new Path(oldName), newPath); -+ } -+ return false; -+ } catch (IOException e) { -+ return false; -+ } -+ } -+ -+ protected boolean ceph_exists(String path) { -+ path = prepare_path(path); -+ boolean ret = false; -+ -+ try { -+ ret = localFS.exists(new Path(path)); -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ protected long ceph_getblocksize(String path) { -+ path = prepare_path(path); -+ try { -+ FileStatus status = localFS.getFileStatus(new Path(path)); -+ -+ return status.getBlockSize(); -+ } catch (FileNotFoundException e) { -+ return -CephFS.ENOENT; -+ } catch (IOException e) { -+ return -1; // just fail generically -+ } -+ } -+ -+ protected boolean ceph_isdirectory(String path) { -+ path = prepare_path(path); -+ try { -+ FileStatus status = localFS.getFileStatus(new Path(path)); -+ -+ return status.isDir(); -+ } catch (IOException e) { -+ return false; -+ } -+ } -+ -+ protected boolean ceph_isfile(String path) { -+ path = prepare_path(path); -+ boolean ret = false; -+ -+ try { -+ FileStatus status = localFS.getFileStatus(new Path(path)); -+ -+ ret = !status.isDir(); -+ } catch (Exception e) {} -+ return ret; -+ } -+ -+ protected String[] ceph_getdir(String path) { -+ path = prepare_path(path); -+ if (!ceph_isdirectory(path)) { -+ return null; -+ } -+ try { -+ FileStatus[] stats = localFS.listStatus(new Path(path)); -+ String[] names = new String[stats.length]; -+ String name; -+ -+ for (int i = 0; i < stats.length; ++i) { -+ name = stats[i].getPath().toString(); -+ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1); -+ } -+ return names; -+ } catch (IOException e) {} -+ return null; -+ } -+ -+ protected int ceph_mkdirs(String path, int mode) { -+ path = prepare_path(path); -+ // debug("ceph_mkdirs on " + path, INFO); -+ try { -+ if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) { -+ return 0; -+ } -+ } catch (IOException e) {} -+ if (ceph_isdirectory(path)) { // apparently it already existed -+ return -EEXIST; -+ } else if (ceph_isfile(path)) { -+ return -ENOTDIR; -+ } -+ return -1; -+ } -+ -+ /* -+ * Unlike a real Ceph deployment, you can't do opens on a directory. -+ * Since that has unpredictable behavior and you shouldn't do it anyway, -+ * it's okay. -+ */ -+ protected int ceph_open_for_append(String path) { -+ path = prepare_path(path); -+ FSDataOutputStream stream; -+ -+ try { -+ stream = localFS.append(new Path(path)); -+ files.put(new Integer(fileCount), stream); -+ filenames.put(new Integer(fileCount), path); -+ return fileCount++; -+ } catch (IOException e) {} -+ return -1; // failure -+ } -+ -+ protected int ceph_open_for_read(String path) { -+ path = prepare_path(path); -+ FSDataInputStream stream; -+ -+ try { -+ stream = localFS.open(new Path(path)); -+ files.put(new Integer(fileCount), stream); -+ filenames.put(new Integer(fileCount), path); -+ LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path); -+ return fileCount++; -+ } catch (IOException e) {} -+ return -1; // failure -+ } -+ -+ protected int ceph_open_for_overwrite(String path, int mode) { -+ path = prepare_path(path); -+ FSDataOutputStream stream; -+ -+ try { -+ stream = localFS.create(new Path(path)); -+ files.put(new Integer(fileCount), stream); -+ filenames.put(new Integer(fileCount), path); -+ LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path); -+ return fileCount++; -+ } catch (IOException e) {} -+ return -1; // failure -+ } -+ -+ protected int ceph_close(int filehandle) { -+ LOG.info("ceph_close(filehandle " + filehandle + ")"); -+ try { -+ ((Closeable) files.get(new Integer(filehandle))).close(); -+ if (null == files.get(new Integer(filehandle))) { -+ return -ENOENT; // this isn't quite the right error code, -+ // but the important part is it's negative -+ } -+ return 0; // hurray, success -+ } catch (NullPointerException ne) { -+ LOG.warn("ceph_close caught NullPointerException!" + ne); -+ } // err, how? -+ catch (IOException ie) { -+ LOG.warn("ceph_close caught IOException!" + ie); -+ } -+ return -1; // failure -+ } -+ -+ protected boolean ceph_setPermission(String pth, int mode) { -+ pth = prepare_path(pth); -+ Path path = new Path(pth); -+ boolean ret = false; -+ -+ try { -+ localFS.setPermission(path, new FsPermission((short) mode)); -+ ret = true; -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ // rather than try and match a Ceph deployment's behavior exactly, -+ // just make bad things happen if they try and call methods after this -+ protected boolean ceph_kill_client() { -+ // debug("ceph_kill_client", INFO); -+ localFS.setWorkingDirectory(new Path(localPrefix)); -+ // debug("working dir is now " + localFS.getWorkingDirectory(), INFO); -+ try { -+ localFS.close(); -+ } catch (Exception e) {} -+ localFS = null; -+ files = null; -+ filenames = null; -+ return true; -+ } -+ -+ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) { -+ pth = prepare_path(pth); -+ Path path = new Path(pth); -+ boolean ret = false; -+ -+ try { -+ FileStatus status = localFS.getFileStatus(path); -+ -+ fill.size = status.getLen(); -+ fill.is_dir = status.isDir(); -+ fill.block_size = status.getBlockSize(); -+ fill.mod_time = status.getModificationTime(); -+ fill.access_time = status.getAccessTime(); -+ fill.mode = status.getPermission().toShort(); -+ ret = true; -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ protected int ceph_replication(String path) { -+ path = prepare_path(path); -+ int ret = -1; // -1 for failure -+ -+ try { -+ ret = localFS.getFileStatus(new Path(path)).getReplication(); -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ protected String[] ceph_hosts(int fh, long offset) { -+ String[] ret = null; -+ -+ try { -+ BlockLocation[] locs = localFS.getFileBlockLocations( -+ localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))), -+ offset, 1); -+ -+ ret = locs[0].getNames(); -+ } catch (IOException e) {} catch (NullPointerException f) {} -+ return ret; -+ } -+ -+ protected int ceph_setTimes(String pth, long mtime, long atime) { -+ pth = prepare_path(pth); -+ Path path = new Path(pth); -+ int ret = -1; // generic fail -+ -+ try { -+ localFS.setTimes(path, mtime, atime); -+ ret = 0; -+ } catch (IOException e) {} -+ return ret; -+ } -+ -+ protected long ceph_getpos(int fh) { -+ long ret = -1; // generic fail -+ -+ try { -+ Object stream = files.get(new Integer(fh)); -+ -+ if (stream instanceof FSDataInputStream) { -+ ret = ((FSDataInputStream) stream).getPos(); -+ } else if (stream instanceof FSDataOutputStream) { -+ ret = ((FSDataOutputStream) stream).getPos(); -+ } -+ } catch (IOException e) {} catch (NullPointerException f) {} -+ return ret; -+ } -+ -+ protected int ceph_write(int fh, byte[] buffer, -+ int buffer_offset, int length) { -+ LOG.info( -+ "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:" -+ + length); -+ long ret = -1; // generic fail -+ -+ try { -+ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh)); -+ -+ LOG.info("ceph_write got outputstream"); -+ long startPos = os.getPos(); -+ -+ os.write(buffer, buffer_offset, length); -+ ret = os.getPos() - startPos; -+ } catch (IOException e) { -+ LOG.warn("ceph_write caught IOException!"); -+ } catch (NullPointerException f) { -+ LOG.warn("ceph_write caught NullPointerException!"); -+ } -+ return (int) ret; -+ } -+ -+ protected int ceph_read(int fh, byte[] buffer, -+ int buffer_offset, int length) { -+ long ret = -1; // generic fail -+ -+ try { -+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh)); -+ long startPos = is.getPos(); -+ -+ is.read(buffer, buffer_offset, length); -+ ret = is.getPos() - startPos; -+ } catch (IOException e) {} catch (NullPointerException f) {} -+ return (int) ret; -+ } -+ -+ protected long ceph_seek_from_start(int fh, long pos) { -+ LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")"); -+ long ret = -1; // generic fail -+ -+ try { -+ LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh))); -+ if (null == files.get(new Integer(fh))) { -+ LOG.warn("ceph_seek_from_start: is is null!"); -+ } -+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh)); -+ -+ LOG.info("ceph_seek_from_start retrieved is!"); -+ is.seek(pos); -+ ret = is.getPos(); -+ } catch (IOException e) { -+ LOG.warn("ceph_seek_from_start caught IOException!"); -+ } catch (NullPointerException f) { -+ LOG.warn("ceph_seek_from_start caught NullPointerException!"); -+ } -+ return (int) ret; -+ } -+ -+ /* -+ * We need to remove the localFS file prefix before returning to Ceph -+ */ -+ private String sanitize_path(String path) { -+ // debug("sanitize_path(" + path + ")", INFO); -+ /* if (path.startsWith("file:")) -+ path = path.substring("file:".length()); */ -+ if (path.startsWith(localPrefix)) { -+ path = path.substring(localPrefix.length()); -+ if (path.length() == 0) { // it was a root path -+ path = "/"; -+ } -+ } -+ // debug("sanitize_path returning " + path, INFO); -+ return path; -+ } -+ -+ /* -+ * If it's an absolute path we need to shove the -+ * test dir onto the front as a prefix. -+ */ -+ private String prepare_path(String path) { -+ // debug("prepare_path(" + path + ")", INFO); -+ if (path.startsWith("/")) { -+ path = localPrefix + path; -+ } else if (path.equals("..")) { -+ if (ceph_getcwd().equals("/")) { -+ path = "."; -+ } // you can't go up past root! -+ } -+ // debug("prepare_path returning" + path, INFO); -+ return path; -+ } -+} -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java -new file mode 100644 -index 0000000..95f2223 ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java -@@ -0,0 +1,804 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * Implements the Hadoop FS interfaces to allow applications to store -+ * files in Ceph. -+ */ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import java.io.IOException; -+import java.io.FileNotFoundException; -+import java.io.OutputStream; -+import java.net.URI; -+import java.net.InetAddress; -+import java.util.EnumSet; -+import java.lang.Math; -+import java.util.ArrayList; -+ -+import org.apache.commons.logging.Log; -+import org.apache.commons.logging.LogFactory; -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.fs.BlockLocation; -+import org.apache.hadoop.fs.FSDataInputStream; -+import org.apache.hadoop.fs.FSInputStream; -+import org.apache.hadoop.fs.FSDataOutputStream; -+import org.apache.hadoop.fs.FileSystem; -+import org.apache.hadoop.fs.FileUtil; -+import org.apache.hadoop.fs.Path; -+import org.apache.hadoop.fs.permission.FsPermission; -+import org.apache.hadoop.util.Progressable; -+import org.apache.hadoop.fs.FileStatus; -+import org.apache.hadoop.net.DNS; -+ -+ -+/** -+ * <p> -+ * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>. -+ * This will not start a Ceph instance; one must already be running. -+ * </p> -+ * Configuration of the CephFileSystem is handled via a few Hadoop -+ * Configuration properties: <br> -+ * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br> -+ * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are -+ * located in. This assumes Hadoop is being run on a linux-style machine -+ * with names like libcephfs.so. -+ * fs.ceph.commandLine -- if you prefer you can fill in this property -+ * just as you would when starting Ceph up from the command line. Specific -+ * properties override any configuration specified here. -+ * <p> -+ * You can also enable debugging of the CephFileSystem and Ceph itself: <br> -+ * fs.ceph.debug -- if 'true' will print out method enter/exit messages, -+ * plus a little more. -+ * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging -+ * from the respective Ceph system of at least that importance. -+ */ -+public class CephFileSystem extends FileSystem { -+ private static final Log LOG = LogFactory.getLog(CephFileSystem.class); -+ private URI uri; -+ -+ private Path workingDir; -+ private final Path root; -+ private CephFS ceph = null; -+ -+ private static String CEPH_NAMESERVER; -+ private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver"; -+ private static final String CEPH_NAMESERVER_DEFAULT = "localhost"; -+ -+ /** -+ * Create a new CephFileSystem. -+ */ -+ public CephFileSystem() { -+ root = new Path("/"); -+ } -+ -+ /** -+ * Used for testing purposes, this constructor -+ * sets the given CephFS instead of defaulting to a -+ * CephTalker (with its assumed real Ceph instance to talk to). -+ */ -+ public CephFileSystem(CephFS ceph_fs) { -+ super(); -+ root = new Path("/"); -+ ceph = ceph_fs; -+ } -+ -+ /** -+ * Lets you get the URI of this CephFileSystem. -+ * @return the URI. -+ */ -+ public URI getUri() { -+ LOG.debug("getUri:exit with return " + uri); -+ return uri; -+ } -+ -+ /** -+ * Should be called after constructing a CephFileSystem but before calling -+ * any other methods. -+ * Starts up the connection to Ceph, reads in configuraton options, etc. -+ * @param uri The URI for this filesystem. -+ * @param conf The Hadoop Configuration to retrieve properties from. -+ * @throws IOException if necessary properties are unset. -+ */ -+ @Override -+ public void initialize(URI uri, Configuration conf) throws IOException { -+ super.initialize(uri, conf); -+ setConf(conf); -+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); -+ if (ceph == null) { -+ ceph = new CephTalker(conf, LOG); -+ } -+ -+ CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT); -+ -+ // build up the arguments for Ceph -+ String arguments = "CephFSInterface"; -+ -+ arguments += conf.get("fs.ceph.commandLine", ""); -+ if (conf.get("fs.ceph.clientDebug") != null) { -+ arguments += " --debug_client "; -+ arguments += conf.get("fs.ceph.clientDebug"); -+ } -+ if (conf.get("fs.ceph.messengerDebug") != null) { -+ arguments += " --debug_ms "; -+ arguments += conf.get("fs.ceph.messengerDebug"); -+ } -+ if (conf.get("fs.ceph.monAddr") != null) { -+ arguments += " -m "; -+ arguments += conf.get("fs.ceph.monAddr"); -+ } -+ arguments += " --client-readahead-max-periods=" -+ + conf.get("fs.ceph.readahead", "1"); -+ // make sure they gave us a ceph monitor address or conf file -+ LOG.info("initialize:Ceph initialization arguments: " + arguments); -+ if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1) -+ && (arguments.indexOf("-c") == -1)) { -+ LOG.fatal("initialize:You need to specify a Ceph monitor address."); -+ throw new IOException( -+ "You must specify a Ceph monitor address or config file!"); -+ } -+ // Initialize the client -+ if (!ceph.ceph_initializeClient(arguments, -+ conf.getInt("fs.ceph.blockSize", 1 << 26))) { -+ LOG.fatal("initialize:Ceph initialization failed!"); -+ throw new IOException("Ceph initialization failed!"); -+ } -+ LOG.info("initialize:Ceph initialized client. Setting cwd to /"); -+ ceph.ceph_setcwd("/"); -+ LOG.debug("initialize:exit"); -+ -+ this.workingDir = getHomeDirectory(); -+ } -+ -+ /** -+ * Close down the CephFileSystem. Runs the base-class close method -+ * and then kills the Ceph client itself. -+ */ -+ @Override -+ public void close() throws IOException { -+ LOG.debug("close:enter"); -+ super.close(); // this method does stuff, make sure it's run! -+ LOG.trace("close: Calling ceph_kill_client from Java"); -+ ceph.ceph_kill_client(); -+ LOG.debug("close:exit"); -+ } -+ -+ /** -+ * Get an FSDataOutputStream to append onto a file. -+ * @param file The File you want to append onto -+ * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like. -+ * @param progress The Progressable to report progress to. -+ * Reporting is limited but exists. -+ * @return An FSDataOutputStream that connects to the file on Ceph. -+ * @throws IOException If the file cannot be found or appended to. -+ */ -+ public FSDataOutputStream append(Path file, int bufferSize, -+ Progressable progress) throws IOException { -+ LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize); -+ Path abs_path = makeAbsolute(file); -+ -+ if (progress != null) { -+ progress.progress(); -+ } -+ LOG.trace("append: Entering ceph_open_for_append from Java"); -+ int fd = ceph.ceph_open_for_append(getCephPath(abs_path)); -+ -+ LOG.trace("append: Returned to Java"); -+ if (progress != null) { -+ progress.progress(); -+ } -+ if (fd < 0) { // error in open -+ throw new IOException( -+ "append: Open for append failed on path \"" + abs_path.toString() -+ + "\""); -+ } -+ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd, -+ bufferSize); -+ -+ LOG.debug("append:exit"); -+ return new FSDataOutputStream(cephOStream, statistics); -+ } -+ -+ /** -+ * Get the current working directory for the given file system -+ * @return the directory Path -+ */ -+ public Path getWorkingDirectory() { -+ return workingDir; -+ } -+ -+ /** -+ * Set the current working directory for the given file system. All relative -+ * paths will be resolved relative to it. -+ * -+ * @param dir The directory to change to. -+ */ -+ @Override -+ public void setWorkingDirectory(Path dir) { -+ workingDir = makeAbsolute(dir); -+ } -+ -+ /** -+ * Return only the path component from a potentially fully qualified path. -+ */ -+ private String getCephPath(Path path) { -+ if (!path.isAbsolute()) { -+ throw new IllegalArgumentException("Path must be absolute: " + path); -+ } -+ return path.toUri().getPath(); -+ } -+ -+ /** -+ * Check if a path exists. -+ * Overriden because it's moderately faster than the generic implementation. -+ * @param path The file to check existence on. -+ * @return true if the file exists, false otherwise. -+ */ -+ @Override -+ public boolean exists(Path path) throws IOException { -+ LOG.debug("exists:enter with path " + path); -+ boolean result; -+ Path abs_path = makeAbsolute(path); -+ -+ if (abs_path.equals(root)) { -+ result = true; -+ } else { -+ LOG.trace( -+ "exists:Calling ceph_exists from Java on path " + abs_path.toString()); -+ result = ceph.ceph_exists(getCephPath(abs_path)); -+ LOG.trace("exists:Returned from ceph_exists to Java"); -+ } -+ LOG.debug("exists:exit with value " + result); -+ return result; -+ } -+ -+ /** -+ * Create a directory and any nonexistent parents. Any portion -+ * of the directory tree can exist without error. -+ * @param path The directory path to create -+ * @param perms The permissions to apply to the created directories. -+ * @return true if successful, false otherwise -+ * @throws IOException if the path is a child of a file. -+ */ -+ @Override -+ public boolean mkdirs(Path path, FsPermission perms) throws IOException { -+ LOG.debug("mkdirs:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ -+ LOG.trace("mkdirs:calling ceph_mkdirs from Java"); -+ int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort()); -+ -+ if (result != 0) { -+ LOG.warn( -+ "mkdirs: make directory " + abs_path + "Failing with result " + result); -+ if (-ceph.ENOTDIR == result) { -+ throw new IOException("Parent path is not a directory"); -+ } -+ return false; -+ } else { -+ LOG.debug("mkdirs:exiting succesfully"); -+ return true; -+ } -+ } -+ -+ /** -+ * Check if a path is a file. This is moderately faster than the -+ * generic implementation. -+ * @param path The path to check. -+ * @return true if the path is definitely a file, false otherwise. -+ */ -+ @Override -+ public boolean isFile(Path path) throws IOException { -+ LOG.debug("isFile:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ boolean result; -+ -+ if (abs_path.equals(root)) { -+ result = false; -+ } else { -+ LOG.trace("isFile:entering ceph_isfile from Java"); -+ result = ceph.ceph_isfile(getCephPath(abs_path)); -+ } -+ LOG.debug("isFile:exit with result " + result); -+ return result; -+ } -+ -+ /** -+ * Get stat information on a file. This does not fill owner or group, as -+ * Ceph's support for these is a bit different than HDFS'. -+ * @param path The path to stat. -+ * @return FileStatus object containing the stat information. -+ * @throws FileNotFoundException if the path could not be resolved. -+ */ -+ public FileStatus getFileStatus(Path path) throws IOException { -+ LOG.debug("getFileStatus:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ // sadly, Ceph doesn't really do uids/gids just yet, but -+ // everything else is filled -+ FileStatus status; -+ Stat lstat = new Stat(); -+ -+ LOG.trace("getFileStatus: calling ceph_stat from Java"); -+ if (ceph.ceph_stat(getCephPath(abs_path), lstat)) { -+ status = new FileStatus(lstat.size, lstat.is_dir, -+ ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size, -+ lstat.mod_time, lstat.access_time, -+ new FsPermission((short) lstat.mode), System.getProperty("user.name"), null, -+ path.makeQualified(this)); -+ } else { // fail out -+ throw new FileNotFoundException( -+ "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path -+ + " does not exist or could not be accessed"); -+ } -+ -+ LOG.debug("getFileStatus:exit"); -+ return status; -+ } -+ -+ /** -+ * Get the FileStatus for each listing in a directory. -+ * @param path The directory to get listings from. -+ * @return FileStatus[] containing one FileStatus for each directory listing; -+ * null if path does not exist. -+ */ -+ public FileStatus[] listStatus(Path path) throws IOException { -+ LOG.debug("listStatus:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ Path[] paths = listPaths(abs_path); -+ -+ if (paths != null) { -+ FileStatus[] statuses = new FileStatus[paths.length]; -+ -+ for (int i = 0; i < paths.length; ++i) { -+ statuses[i] = getFileStatus(paths[i]); -+ } -+ LOG.debug("listStatus:exit"); -+ return statuses; -+ } -+ -+ if (isFile(path)) { -+ return new FileStatus[] { getFileStatus(path) }; -+ } -+ -+ return null; -+ } -+ -+ @Override -+ public void setPermission(Path p, FsPermission permission) throws IOException { -+ LOG.debug( -+ "setPermission:enter with path " + p + " and permissions " + permission); -+ Path abs_path = makeAbsolute(p); -+ -+ LOG.trace("setPermission:calling ceph_setpermission from Java"); -+ ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort()); -+ LOG.debug("setPermission:exit"); -+ } -+ -+ /** -+ * Set access/modification times of a file. -+ * @param p The path -+ * @param mtime Set modification time in number of millis since Jan 1, 1970. -+ * @param atime Set access time in number of millis since Jan 1, 1970. -+ */ -+ @Override -+ public void setTimes(Path p, long mtime, long atime) throws IOException { -+ LOG.debug( -+ "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime); -+ Path abs_path = makeAbsolute(p); -+ -+ LOG.trace("setTimes:calling ceph_setTimes from Java"); -+ int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime); -+ -+ if (r < 0) { -+ throw new IOException( -+ "Failed to set times on path " + abs_path.toString() + " Error code: " -+ + r); -+ } -+ LOG.debug("setTimes:exit"); -+ } -+ -+ /** -+ * Create a new file and open an FSDataOutputStream that's connected to it. -+ * @param path The file to create. -+ * @param permission The permissions to apply to the file. -+ * @param overwrite If true, overwrite any existing file with -+ * this name; otherwise don't. -+ * @param bufferSize Ceph does internal buffering, but you can buffer -+ * in the Java code too if you like. -+ * @param replication Ignored by Ceph. This can be -+ * configured via Ceph configuration. -+ * @param blockSize Ignored by Ceph. You can set client-wide block sizes -+ * via the fs.ceph.blockSize param if you like. -+ * @param progress A Progressable to report back to. -+ * Reporting is limited but exists. -+ * @return An FSDataOutputStream pointing to the created file. -+ * @throws IOException if the path is an -+ * existing directory, or the path exists but overwrite is false, or there is a -+ * failure in attempting to open for append with Ceph. -+ */ -+ public FSDataOutputStream create(Path path, -+ FsPermission permission, -+ boolean overwrite, -+ int bufferSize, -+ short replication, -+ long blockSize, -+ Progressable progress) throws IOException { -+ LOG.debug("create:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ -+ if (progress != null) { -+ progress.progress(); -+ } -+ // We ignore replication since that's not configurable here, and -+ // progress reporting is quite limited. -+ // Required semantics: if the file exists, overwrite if 'overwrite' is set; -+ // otherwise, throw an exception -+ -+ // Step 1: existence test -+ boolean exists = exists(abs_path); -+ -+ if (exists) { -+ if (getFileStatus(abs_path).isDir()) { -+ throw new IOException( -+ "create: Cannot overwrite existing directory \"" + path.toString() -+ + "\" with a file"); -+ } -+ if (!overwrite) { -+ throw new IOException( -+ "createRaw: Cannot open existing file \"" + abs_path.toString() -+ + "\" for writing without overwrite flag"); -+ } -+ } -+ -+ if (progress != null) { -+ progress.progress(); -+ } -+ -+ // Step 2: create any nonexistent directories in the path -+ if (!exists) { -+ Path parent = abs_path.getParent(); -+ -+ if (parent != null) { // if parent is root, we're done -+ int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort()); -+ -+ if (!(r == 0 || r == -ceph.EEXIST)) { -+ throw new IOException("Error creating parent directory; code: " + r); -+ } -+ } -+ if (progress != null) { -+ progress.progress(); -+ } -+ } -+ // Step 3: open the file -+ LOG.trace("calling ceph_open_for_overwrite from Java"); -+ int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path), -+ (int) permission.toShort()); -+ -+ if (progress != null) { -+ progress.progress(); -+ } -+ LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh); -+ if (fh < 0) { -+ throw new IOException( -+ "create: Open for overwrite failed on path \"" + path.toString() -+ + "\""); -+ } -+ -+ // Step 4: create the stream -+ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh, -+ bufferSize); -+ -+ LOG.debug("create:exit"); -+ return new FSDataOutputStream(cephOStream, statistics); -+ } -+ -+ /** -+ * Open a Ceph file and attach the file handle to an FSDataInputStream. -+ * @param path The file to open -+ * @param bufferSize Ceph does internal buffering; but you can buffer in -+ * the Java code too if you like. -+ * @return FSDataInputStream reading from the given path. -+ * @throws IOException if the path DNE or is a -+ * directory, or there is an error getting data to set up the FSDataInputStream. -+ */ -+ public FSDataInputStream open(Path path, int bufferSize) throws IOException { -+ LOG.debug("open:enter with path " + path); -+ Path abs_path = makeAbsolute(path); -+ -+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path)); -+ -+ if (fh < 0) { // uh-oh, something's bad! -+ if (fh == -ceph.ENOENT) { // well that was a stupid open -+ throw new IOException( -+ "open: absolute path \"" + abs_path.toString() -+ + "\" does not exist"); -+ } else { // hrm...the file exists but we can't open it :( -+ throw new IOException("open: Failed to open file " + abs_path.toString()); -+ } -+ } -+ -+ if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories -+ // but that doesn't mean you should in Hadoop! -+ ceph.ceph_close(fh); -+ throw new IOException( -+ "open: absolute path \"" + abs_path.toString() + "\" is a directory!"); -+ } -+ Stat lstat = new Stat(); -+ -+ LOG.trace("open:calling ceph_stat from Java"); -+ ceph.ceph_stat(getCephPath(abs_path), lstat); -+ LOG.trace("open:returned to Java"); -+ long size = lstat.size; -+ -+ if (size < 0) { -+ throw new IOException( -+ "Failed to get file size for file " + abs_path.toString() -+ + " but succeeded in opening file. Something bizarre is going on."); -+ } -+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size, -+ bufferSize); -+ -+ LOG.debug("open:exit"); -+ return new FSDataInputStream(cephIStream); -+ } -+ -+ /** -+ * Rename a file or directory. -+ * @param src The current path of the file/directory -+ * @param dst The new name for the path. -+ * @return true if the rename succeeded, false otherwise. -+ */ -+ @Override -+ public boolean rename(Path src, Path dst) throws IOException { -+ LOG.debug("rename:enter with src:" + src + " and dest:" + dst); -+ Path abs_src = makeAbsolute(src); -+ Path abs_dst = makeAbsolute(dst); -+ -+ LOG.trace("calling ceph_rename from Java"); -+ boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst)); -+ -+ if (!result) { -+ boolean isDir = false; -+ try { -+ isDir = getFileStatus(abs_dst).isDir(); -+ } catch (FileNotFoundException e) {} -+ if (isDir) { // move the srcdir into destdir -+ LOG.debug("ceph_rename failed but dst is a directory!"); -+ Path new_dst = new Path(abs_dst, abs_src.getName()); -+ -+ result = rename(abs_src, new_dst); -+ LOG.debug( -+ "attempt to move " + abs_src.toString() + " to " -+ + new_dst.toString() + "has result:" + result); -+ } -+ } -+ LOG.debug("rename:exit with result: " + result); -+ return result; -+ } -+ -+ /* -+ * Attempt to convert an IP into its hostname -+ */ -+ private String[] ips2Hosts(String[] ips) { -+ ArrayList<String> hosts = new ArrayList<String>(); -+ for (String ip : ips) { -+ try { -+ String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER); -+ if (host.charAt(host.length()-1) == '.') { -+ host = host.substring(0, host.length()-1); -+ } -+ hosts.add(host); /* append */ -+ } catch (Exception e) { -+ LOG.error("reverseDns ["+ip+"] failed: "+ e); -+ } -+ } -+ return hosts.toArray(new String[hosts.size()]); -+ } -+ -+ /** -+ * Get a BlockLocation object for each block in a file. -+ * -+ * Note that this doesn't include port numbers in the name field as -+ * Ceph handles slow/down servers internally. This data should be used -+ * only for selecting which servers to run which jobs on. -+ * -+ * @param file A FileStatus object corresponding to the file you want locations for. -+ * @param start The offset of the first part of the file you are interested in. -+ * @param len The amount of the file past the offset you are interested in. -+ * @return A BlockLocation[] where each object corresponds to a block within -+ * the given range. -+ */ -+ @Override -+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { -+ Path abs_path = makeAbsolute(file.getPath()); -+ -+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path)); -+ if (fh < 0) { -+ LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!"); -+ return null; -+ } -+ -+ long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path)); -+ BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)]; -+ -+ for (int i = 0; i < locations.length; ++i) { -+ long offset = start + i * blockSize; -+ long blockStart = start + i * blockSize - (start % blockSize); -+ String ips[] = ceph.ceph_hosts(fh, offset); -+ String hosts[] = ips2Hosts(ips); -+ locations[i] = new BlockLocation(null, hosts, blockStart, blockSize); -+ LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]); -+ } -+ -+ ceph.ceph_close(fh); -+ return locations; -+ } -+ -+ @Deprecated -+ public boolean delete(Path path) throws IOException { -+ return delete(path, false); -+ } -+ -+ /** -+ * Delete the given path, and optionally its children. -+ * @param path the path to delete. -+ * @param recursive If the path is a non-empty directory and this is false, -+ * delete will throw an IOException. If path is a file this is ignored. -+ * @return true if the delete succeeded, false otherwise (including if -+ * path doesn't exist). -+ * @throws IOException if you attempt to non-recursively delete a directory, -+ * or you attempt to delete the root directory. -+ */ -+ public boolean delete(Path path, boolean recursive) throws IOException { -+ LOG.debug("delete:enter with path " + path + " and recursive=" + recursive); -+ Path abs_path = makeAbsolute(path); -+ -+ // sanity check -+ if (abs_path.equals(root)) { -+ throw new IOException("Error: deleting the root directory is a Bad Idea."); -+ } -+ if (!exists(abs_path)) { -+ return false; -+ } -+ -+ // if the path is a file, try to delete it. -+ if (isFile(abs_path)) { -+ LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path); -+ boolean result = ceph.ceph_unlink(getCephPath(abs_path)); -+ -+ if (!result) { -+ LOG.error( -+ "delete: failed to delete file \"" + abs_path.toString() + "\"."); -+ } -+ LOG.debug("delete:exit with success=" + result); -+ return result; -+ } -+ -+ /* The path is a directory, so recursively try to delete its contents, -+ and then delete the directory. */ -+ // get the entries; listPaths will remove . and .. for us -+ Path[] contents = listPaths(abs_path); -+ -+ if (contents == null) { -+ LOG.error( -+ "delete: Failed to read contents of directory \"" -+ + abs_path.toString() + "\" while trying to delete it, BAILING"); -+ return false; -+ } -+ if (!recursive && contents.length > 0) { -+ throw new IOException("Directories must be deleted recursively!"); -+ } -+ // delete the entries -+ LOG.debug("delete: recursively calling delete on contents of " + abs_path); -+ for (Path p : contents) { -+ if (!delete(p, true)) { -+ LOG.error( -+ "delete: Failed to delete file \"" + p.toString() -+ + "\" while recursively deleting \"" + abs_path.toString() -+ + "\", BAILING"); -+ return false; -+ } -+ } -+ // if we've come this far it's a now-empty directory, so delete it! -+ boolean result = ceph.ceph_rmdir(getCephPath(abs_path)); -+ -+ if (!result) { -+ LOG.error( -+ "delete: failed to delete \"" + abs_path.toString() + "\", BAILING"); -+ } -+ LOG.debug("delete:exit"); -+ return result; -+ } -+ -+ /** -+ * Returns the default replication value of 1. This may -+ * NOT be the actual value, as replication is controlled -+ * by a separate Ceph configuration. -+ */ -+ @Override -+ public short getDefaultReplication() { -+ return 1; -+ } -+ -+ /** -+ * Get the default block size. -+ * @return the default block size, in bytes, as a long. -+ */ -+ @Override -+ public long getDefaultBlockSize() { -+ return getConf().getInt("fs.ceph.blockSize", 1 << 26); -+ } -+ -+ /** -+ * Adds the working directory to path if path is not already -+ * an absolute path. The URI scheme is not removed here. It -+ * is removed only when users (e.g. ceph native calls) need -+ * the path-only portion. -+ */ -+ private Path makeAbsolute(Path path) { -+ if (path.isAbsolute()) { -+ return path; -+ } -+ return new Path(workingDir, path); -+ } -+ -+ private Path[] listPaths(Path path) throws IOException { -+ LOG.debug("listPaths:enter with path " + path); -+ String dirlist[]; -+ -+ Path abs_path = makeAbsolute(path); -+ -+ // If it's a directory, get the listing. Otherwise, complain and give up. -+ LOG.debug("calling ceph_getdir from Java with path " + abs_path); -+ dirlist = ceph.ceph_getdir(getCephPath(abs_path)); -+ LOG.debug("returning from ceph_getdir to Java"); -+ -+ if (dirlist == null) { -+ return null; -+ } -+ -+ // convert the strings to Paths -+ Path[] paths = new Path[dirlist.length]; -+ -+ for (int i = 0; i < dirlist.length; ++i) { -+ LOG.trace( -+ "Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" -+ + dirlist[i] + "\""); -+ // convert each listing to an absolute path -+ Path raw_path = new Path(dirlist[i]); -+ -+ if (raw_path.isAbsolute()) { -+ paths[i] = raw_path; -+ } else { -+ paths[i] = new Path(abs_path, raw_path); -+ } -+ } -+ LOG.debug("listPaths:exit"); -+ return paths; -+ } -+ -+ static class Stat { -+ public long size; -+ public boolean is_dir; -+ public long block_size; -+ public long mod_time; -+ public long access_time; -+ public int mode; -+ -+ public Stat() {} -+ } -+} -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java -new file mode 100644 -index 0000000..d9668d0 ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java -@@ -0,0 +1,254 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * Implements the Hadoop FS interfaces to allow applications to store -+ * files in Ceph. -+ */ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import java.io.IOException; -+ -+import org.apache.commons.logging.Log; -+import org.apache.commons.logging.LogFactory; -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.fs.FSInputStream; -+ -+ -+/** -+ * <p> -+ * An {@link FSInputStream} for a CephFileSystem and corresponding -+ * Ceph instance. -+ */ -+public class CephInputStream extends FSInputStream { -+ private static final Log LOG = LogFactory.getLog(CephInputStream.class); -+ private boolean closed; -+ -+ private int fileHandle; -+ -+ private long fileLength; -+ -+ private CephFS ceph; -+ -+ private byte[] buffer; -+ private int bufPos = 0; -+ private int bufValid = 0; -+ private long cephPos = 0; -+ -+ /** -+ * Create a new CephInputStream. -+ * @param conf The system configuration. Unused. -+ * @param fh The filehandle provided by Ceph to reference. -+ * @param flength The current length of the file. If the length changes -+ * you will need to close and re-open it to access the new data. -+ */ -+ public CephInputStream(Configuration conf, CephFS cephfs, -+ int fh, long flength, int bufferSize) { -+ // Whoever's calling the constructor is responsible for doing the actual ceph_open -+ // call and providing the file handle. -+ fileLength = flength; -+ fileHandle = fh; -+ closed = false; -+ ceph = cephfs; -+ buffer = new byte[bufferSize]; -+ LOG.debug( -+ "CephInputStream constructor: initializing stream with fh " + fh -+ + " and file length " + flength); -+ -+ } -+ -+ /** Ceph likes things to be closed before it shuts down, -+ * so closing the IOStream stuff voluntarily in a finalizer is good -+ */ -+ protected void finalize() throws Throwable { -+ try { -+ if (!closed) { -+ close(); -+ } -+ } finally { -+ super.finalize(); -+ } -+ } -+ -+ private synchronized boolean fillBuffer() throws IOException { -+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length); -+ bufPos = 0; -+ if (bufValid < 0) { -+ int err = bufValid; -+ -+ bufValid = 0; -+ // attempt to reset to old position. If it fails, too bad. -+ ceph.ceph_seek_from_start(fileHandle, cephPos); -+ throw new IOException("Failed to fill read buffer! Error code:" + err); -+ } -+ cephPos += bufValid; -+ return (bufValid != 0); -+ } -+ -+ /* -+ * Get the current position of the stream. -+ */ -+ public synchronized long getPos() throws IOException { -+ return cephPos - bufValid + bufPos; -+ } -+ -+ /** -+ * Find the number of bytes remaining in the file. -+ */ -+ @Override -+ public synchronized int available() throws IOException { -+ return (int) (fileLength - getPos()); -+ } -+ -+ public synchronized void seek(long targetPos) throws IOException { -+ LOG.trace( -+ "CephInputStream.seek: Seeking to position " + targetPos + " on fd " -+ + fileHandle); -+ if (targetPos > fileLength) { -+ throw new IOException( -+ "CephInputStream.seek: failed seek to position " + targetPos -+ + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); -+ } -+ long oldPos = cephPos; -+ -+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); -+ bufValid = 0; -+ bufPos = 0; -+ if (cephPos < 0) { -+ cephPos = oldPos; -+ throw new IOException("Ceph failed to seek to new position!"); -+ } -+ } -+ -+ /** -+ * Failovers are handled by the Ceph code at a very low level; -+ * if there are issues that can be solved by changing sources -+ * they'll be dealt with before anybody even tries to call this method! -+ * @return false. -+ */ -+ public synchronized boolean seekToNewSource(long targetPos) { -+ return false; -+ } -+ -+ /** -+ * Read a byte from the file. -+ * @return the next byte. -+ */ -+ @Override -+ public synchronized int read() throws IOException { -+ LOG.trace( -+ "CephInputStream.read: Reading a single byte from fd " + fileHandle -+ + " by calling general read function"); -+ -+ byte result[] = new byte[1]; -+ -+ if (getPos() >= fileLength) { -+ return -1; -+ } -+ if (-1 == read(result, 0, 1)) { -+ return -1; -+ } -+ if (result[0] < 0) { -+ return 256 + (int) result[0]; -+ } else { -+ return result[0]; -+ } -+ } -+ -+ /** -+ * Read a specified number of bytes from the file into a byte[]. -+ * @param buf the byte array to read into. -+ * @param off the offset to start at in the file -+ * @param len the number of bytes to read -+ * @return 0 if successful, otherwise an error code. -+ * @throws IOException on bad input. -+ */ -+ @Override -+ public synchronized int read(byte buf[], int off, int len) -+ throws IOException { -+ LOG.trace( -+ "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); -+ -+ if (closed) { -+ throw new IOException( -+ "CephInputStream.read: cannot read " + len + " bytes from fd " -+ + fileHandle + ": stream closed"); -+ } -+ -+ // ensure we're not past the end of the file -+ if (getPos() >= fileLength) { -+ LOG.debug( -+ "CephInputStream.read: cannot read " + len + " bytes from fd " -+ + fileHandle + ": current position is " + getPos() -+ + " and file length is " + fileLength); -+ -+ return -1; -+ } -+ -+ int totalRead = 0; -+ int initialLen = len; -+ int read; -+ -+ do { -+ read = Math.min(len, bufValid - bufPos); -+ try { -+ System.arraycopy(buffer, bufPos, buf, off, read); -+ } catch (IndexOutOfBoundsException ie) { -+ throw new IOException( -+ "CephInputStream.read: Indices out of bounds:" + "read length is " -+ + len + ", buffer offset is " + off + ", and buffer size is " -+ + buf.length); -+ } catch (ArrayStoreException ae) { -+ throw new IOException( -+ "Uh-oh, CephInputStream failed to do an array" -+ + "copy due to type mismatch..."); -+ } catch (NullPointerException ne) { -+ throw new IOException( -+ "CephInputStream.read: cannot read " + len + "bytes from fd:" -+ + fileHandle + ": buf is null"); -+ } -+ bufPos += read; -+ len -= read; -+ off += read; -+ totalRead += read; -+ } while (len > 0 && fillBuffer()); -+ -+ LOG.trace( -+ "CephInputStream.read: Reading " + initialLen + " bytes from fd " -+ + fileHandle + ": succeeded in reading " + totalRead + " bytes"); -+ return totalRead; -+ } -+ -+ /** -+ * Close the CephInputStream and release the associated filehandle. -+ */ -+ @Override -+ public void close() throws IOException { -+ LOG.trace("CephOutputStream.close:enter"); -+ if (!closed) { -+ int result = ceph.ceph_close(fileHandle); -+ -+ closed = true; -+ if (result != 0) { -+ throw new IOException( -+ "Close somehow failed!" -+ + "Don't try and use this stream again, though"); -+ } -+ LOG.trace("CephOutputStream.close:exit"); -+ } -+ } -+} -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java -new file mode 100644 -index 0000000..4c50f88 ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java -@@ -0,0 +1,219 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * Implements the Hadoop FS interfaces to allow applications to store -+ * files in Ceph. -+ */ -+ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import java.io.IOException; -+import java.io.OutputStream; -+ -+import org.apache.commons.logging.Log; -+import org.apache.commons.logging.LogFactory; -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.util.Progressable; -+ -+ -+/** -+ * <p> -+ * An {@link OutputStream} for a CephFileSystem and corresponding -+ * Ceph instance. -+ */ -+public class CephOutputStream extends OutputStream { -+ private static final Log LOG = LogFactory.getLog(CephOutputStream.class); -+ private boolean closed; -+ -+ private CephFS ceph; -+ -+ private int fileHandle; -+ -+ private byte[] buffer; -+ private int bufUsed = 0; -+ -+ /** -+ * Construct the CephOutputStream. -+ * @param conf The FileSystem configuration. -+ * @param fh The Ceph filehandle to connect to. -+ */ -+ public CephOutputStream(Configuration conf, CephFS cephfs, -+ int fh, int bufferSize) { -+ ceph = cephfs; -+ fileHandle = fh; -+ closed = false; -+ buffer = new byte[bufferSize]; -+ } -+ -+ /** Ceph likes things to be closed before it shuts down, -+ *so closing the IOStream stuff voluntarily is good -+ */ -+ protected void finalize() throws Throwable { -+ try { -+ if (!closed) { -+ close(); -+ } -+ } finally { -+ super.finalize(); -+ } -+ } -+ -+ /** -+ * Get the current position in the file. -+ * @return The file offset in bytes. -+ */ -+ public long getPos() throws IOException { -+ return ceph.ceph_getpos(fileHandle); -+ } -+ -+ /** -+ * Write a byte. -+ * @param b The byte to write. -+ * @throws IOException If you have closed the CephOutputStream or the -+ * write fails. -+ */ -+ @Override -+ public synchronized void write(int b) throws IOException { -+ LOG.trace( -+ "CephOutputStream.write: writing a single byte to fd " + fileHandle); -+ -+ if (closed) { -+ throw new IOException( -+ "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle -+ + ": stream closed"); -+ } -+ // Stick the byte in a buffer and write it -+ byte buf[] = new byte[1]; -+ -+ buf[0] = (byte) b; -+ write(buf, 0, 1); -+ return; -+ } -+ -+ /** -+ * Write a byte buffer into the Ceph file. -+ * @param buf the byte array to write from -+ * @param off the position in the file to start writing at. -+ * @param len The number of bytes to actually write. -+ * @throws IOException if you have closed the CephOutputStream, or -+ * if buf is null or off + len > buf.length, or -+ * if the write fails due to a Ceph error. -+ */ -+ @Override -+ public synchronized void write(byte buf[], int off, int len) throws IOException { -+ LOG.trace( -+ "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle); -+ // make sure stream is open -+ if (closed) { -+ throw new IOException( -+ "CephOutputStream.write: cannot write " + len + "bytes to fd " -+ + fileHandle + ": stream closed"); -+ } -+ -+ int result; -+ int write; -+ -+ while (len > 0) { -+ write = Math.min(len, buffer.length - bufUsed); -+ try { -+ System.arraycopy(buf, off, buffer, bufUsed, write); -+ } catch (IndexOutOfBoundsException ie) { -+ throw new IOException( -+ "CephOutputStream.write: Indices out of bounds: " -+ + "write length is " + len + ", buffer offset is " + off -+ + ", and buffer size is " + buf.length); -+ } catch (ArrayStoreException ae) { -+ throw new IOException( -+ "Uh-oh, CephOutputStream failed to do an array" -+ + " copy due to type mismatch..."); -+ } catch (NullPointerException ne) { -+ throw new IOException( -+ "CephOutputStream.write: cannot write " + len + "bytes to fd " -+ + fileHandle + ": buffer is null"); -+ } -+ bufUsed += write; -+ len -= write; -+ off += write; -+ if (bufUsed == buffer.length) { -+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); -+ if (result < 0) { -+ throw new IOException( -+ "CephOutputStream.write: Buffered write of " + bufUsed -+ + " bytes failed!"); -+ } -+ if (result != bufUsed) { -+ throw new IOException( -+ "CephOutputStream.write: Wrote only " + result + " bytes of " -+ + bufUsed + " in buffer! Data may be lost or written" -+ + " twice to Ceph!"); -+ } -+ bufUsed = 0; -+ } -+ -+ } -+ return; -+ } -+ -+ /** -+ * Flush the buffered data. -+ * @throws IOException if you've closed the stream or the write fails. -+ */ -+ @Override -+ public synchronized void flush() throws IOException { -+ if (!closed) { -+ if (bufUsed == 0) { -+ return; -+ } -+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); -+ -+ if (result < 0) { -+ throw new IOException( -+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " -+ + fileHandle + " failed"); -+ } -+ if (result != bufUsed) { -+ throw new IOException( -+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " -+ + fileHandle + "was incomplete: only " + result + " of " + bufUsed -+ + " bytes were written."); -+ } -+ bufUsed = 0; -+ return; -+ } -+ } -+ -+ /** -+ * Close the CephOutputStream. -+ * @throws IOException if Ceph somehow returns an error. In current code it can't. -+ */ -+ @Override -+ public synchronized void close() throws IOException { -+ LOG.trace("CephOutputStream.close:enter"); -+ if (!closed) { -+ flush(); -+ int result = ceph.ceph_close(fileHandle); -+ -+ if (result != 0) { -+ throw new IOException("Close failed!"); -+ } -+ -+ closed = true; -+ LOG.trace("CephOutputStream.close:exit"); -+ } -+ } -+} -diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java -new file mode 100644 -index 0000000..569652f ---- /dev/null -+++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java -@@ -0,0 +1,91 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * -+ * Licensed under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -+ * implied. See the License for the specific language governing -+ * permissions and limitations under the License. -+ * -+ * -+ * Wraps a number of native function calls to communicate with the Ceph -+ * filesystem. -+ */ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import org.apache.hadoop.conf.Configuration; -+import org.apache.commons.logging.Log; -+ -+ -+class CephTalker extends CephFS { -+ // JNI doesn't give us any way to store pointers, so use a long. -+ // Here we're assuming pointers aren't longer than 8 bytes. -+ long cluster; -+ -+ // we write a constructor so we can load the libraries -+ public CephTalker(Configuration conf, Log log) { -+ System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so"); -+ System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so"); -+ cluster = 0; -+ } -+ -+ protected native boolean ceph_initializeClient(String arguments, int block_size); -+ -+ protected native String ceph_getcwd(); -+ -+ protected native boolean ceph_setcwd(String path); -+ -+ protected native boolean ceph_rmdir(String path); -+ -+ protected native boolean ceph_unlink(String path); -+ -+ protected native boolean ceph_rename(String old_path, String new_path); -+ -+ protected native boolean ceph_exists(String path); -+ -+ protected native long ceph_getblocksize(String path); -+ -+ protected native boolean ceph_isdirectory(String path); -+ -+ protected native boolean ceph_isfile(String path); -+ -+ protected native String[] ceph_getdir(String path); -+ -+ protected native int ceph_mkdirs(String path, int mode); -+ -+ protected native int ceph_open_for_append(String path); -+ -+ protected native int ceph_open_for_read(String path); -+ -+ protected native int ceph_open_for_overwrite(String path, int mode); -+ -+ protected native int ceph_close(int filehandle); -+ -+ protected native boolean ceph_setPermission(String path, int mode); -+ -+ protected native boolean ceph_kill_client(); -+ -+ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill); -+ -+ protected native int ceph_replication(String Path); -+ -+ protected native String[] ceph_hosts(int fh, long offset); -+ -+ protected native int ceph_setTimes(String path, long mtime, long atime); -+ -+ protected native long ceph_getpos(int fh); -+ -+ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); -+ -+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); -+ -+ protected native long ceph_seek_from_start(int fh, long pos); -+} -diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java -index 9e22f1f..cd55361 100644 ---- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java -+++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java -@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager { - if (modifiedTime != desiredTimestamp) { - DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, - DateFormat.SHORT); -+ /* - throw new IOException("The distributed cache object " + source + - " changed during the job from " + - df.format(new Date(desiredTimestamp)) + " to " + - df.format(new Date(modifiedTime))); -+ */ - } - - Path parchive = null; -diff --git a/src/test/commit-tests b/src/test/commit-tests -index 1148c8b..85fa53d 100644 ---- a/src/test/commit-tests -+++ b/src/test/commit-tests -@@ -53,6 +53,7 @@ - **/TestRPC.java - **/TestS3Credentials.java - **/TestS3FileSystem.java -+**/TestCeph.java - **/TestSaslRPC.java - **/TestScriptBasedMapping.java - **/TestSequenceFileSerialization.java -diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java -new file mode 100644 -index 0000000..e46b0ee ---- /dev/null -+++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java -@@ -0,0 +1,45 @@ -+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- -+ -+/** -+ * Licensed to the Apache Software Foundation (ASF) under one -+ * or more contributor license agreements. See the NOTICE file -+ * distributed with this work for additional information -+ * regarding copyright ownership. The ASF licenses this file -+ * to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance -+ * with the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ * -+ * Unit tests for the CephFileSystem API implementation. -+ */ -+ -+package org.apache.hadoop.fs.ceph; -+ -+ -+import java.io.IOException; -+import java.net.URI; -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.fs.FileSystemContractBaseTest; -+import org.apache.hadoop.fs.FileSystem; -+import org.apache.hadoop.fs.Path; -+ -+ -+public class TestCeph extends FileSystemContractBaseTest { -+ -+ @Override -+ protected void setUp() throws IOException { -+ Configuration conf = new Configuration(); -+ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG); -+ CephFileSystem cephfs = new CephFileSystem(cephfaker); -+ -+ cephfs.initialize(URI.create("ceph://null"), conf); -+ fs = cephfs; -+ } -+} diff --git a/src/client/hadoop/Readme b/src/client/hadoop/Readme deleted file mode 100644 index 2967b96cf5a..00000000000 --- a/src/client/hadoop/Readme +++ /dev/null @@ -1,17 +0,0 @@ -This directory contains: -CephFSInterface.cc/h: A C++ JNI library used by the Hadoop Java code. -ceph: A directory containing all the Java source files for a -Hadoop-compliant CephFileSystem. -HADOOP-ceph.patch: A patch for Hadoop. It should apply fine to one of the -.20 branches. (It was generated against .20.205.0) This -patch adds in all the files contained in the ceph dir as well as making -some changes so that Hadoop's configuration code will recognize the -CephFileSystem properties and classes. It is possible that this will be -out-of-date compared to the files contained in the ceph dir, so you -should apply this patch and then copy ceph/* into the appropriate Hadoop -dir. - -There are also a number of javah-generated C header files which are used -in writing CephFSInterface but can be safely ignored otherwise. - -Configuration instructions are included in Javadoc format in the ceph dir. diff --git a/src/client/hadoop/ceph/CephFS.java b/src/client/hadoop/ceph/CephFS.java deleted file mode 100644 index 5d51eb21600..00000000000 --- a/src/client/hadoop/ceph/CephFS.java +++ /dev/null @@ -1,250 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Abstract base class for communicating with a Ceph filesystem and its - * C++ codebase from Java, or pretending to do so (for unit testing purposes). - * As only the Ceph package should be using this directly, all methods - * are protected. - */ -package org.apache.hadoop.fs.ceph; - -import org.apache.hadoop.conf.Configuration; - -abstract class CephFS { - - protected static final int ENOTDIR = 20; - protected static final int EEXIST = 17; - protected static final int ENOENT = 2; - - /* - * Performs any necessary setup to allow general use of the filesystem. - * Inputs: - * String argsuments -- a command-line style input of Ceph config params - * int block_size -- the size in bytes to use for blocks - * Returns: true on success, false otherwise - */ - abstract protected boolean ceph_initializeClient(String arguments, int block_size); - - /* - * Returns the current working directory (absolute) as a String - */ - abstract protected String ceph_getcwd(); - - /* - * Changes the working directory. - * Inputs: - * String path: The path (relative or absolute) to switch to - * Returns: true on success, false otherwise. - */ - abstract protected boolean ceph_setcwd(String path); - - /* - * Given a path to a directory, removes the directory if empty. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory - * Returns: true on successful delete; false otherwise - */ - abstract protected boolean ceph_rmdir(String path); - - /* - * Given a path, unlinks it. - * Inputs: - * String path: The path (relative or absolute) to the file or empty dir - * Returns: true if the unlink occurred, false otherwise. - */ - abstract protected boolean ceph_unlink(String path); - - /* - * Changes a given path name to a new name, assuming new_path doesn't exist. - * Inputs: - * jstring j_from: The path whose name you want to change. - * jstring j_to: The new name for the path. - * Returns: true if the rename occurred, false otherwise - */ - abstract protected boolean ceph_rename(String old_path, String new_path); - - /* - * Returns true if it the input path exists, false - * if it does not or there is an unexpected failure. - */ - abstract protected boolean ceph_exists(String path); - - /* - * Get the block size for a given path. - * Input: - * String path: The path (relative or absolute) you want - * the block size for. - * Returns: block size if the path exists, otherwise a negative number - * corresponding to the standard C++ error codes (which are positive). - */ - abstract protected long ceph_getblocksize(String path); - - /* - * Returns true if the given path is a directory, false otherwise. - */ - abstract protected boolean ceph_isdirectory(String path); - - /* - * Returns true if the given path is a file; false otherwise. - */ - abstract protected boolean ceph_isfile(String path); - - /* - * Get the contents of a given directory. - * Inputs: - * String path: The path (relative or absolute) to the directory. - * Returns: A Java String[] of the contents of the directory, or - * NULL if there is an error (ie, path is not a dir). This listing - * will not contain . or .. entries. - */ - abstract protected String[] ceph_getdir(String path); - - /* - * Create the specified directory and any required intermediate ones with the - * given mode. - */ - abstract protected int ceph_mkdirs(String path, int mode); - - /* - * Open a file to append. If the file does not exist, it will be created. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - abstract protected int ceph_open_for_append(String path); - - /* - * Open a file for reading. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - abstract protected int ceph_open_for_read(String path); - - /* - * Opens a file for overwriting; creates it if necessary. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * int mode: The mode to open with. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - abstract protected int ceph_open_for_overwrite(String path, int mode); - - /* - * Closes the given file. Returns 0 on success, or a negative - * error code otherwise. - */ - abstract protected int ceph_close(int filehandle); - - /* - * Change the mode on a path. - * Inputs: - * String path: The path to change mode on. - * int mode: The mode to apply. - * Returns: true if the mode is properly applied, false if there - * is any error. - */ - abstract protected boolean ceph_setPermission(String path, int mode); - - /* - * Closes the Ceph client. This should be called before shutting down - * (multiple times is okay but redundant). - */ - abstract protected boolean ceph_kill_client(); - - /* - * Get the statistics on a path returned in a custom format defined - * in CephFileSystem. - * Inputs: - * String path: The path to stat. - * Stat fill: The stat object to fill. - * Returns: true if the stat is successful, false otherwise. - */ - abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill); - - /* - * Check how many times a file should be replicated. If it is, - * degraded it may not actually be replicated this often. - * Inputs: - * int fh: a file descriptor - * Returns: an int containing the number of times replicated. - */ - abstract protected int ceph_replication(String path); - - /* - * Find the IP address of the primary OSD for a given file and offset. - * Inputs: - * int fh: The filehandle for the file. - * long offset: The offset to get the location of. - * Returns: an array of String of the location as IP, or NULL if there is an error. - */ - abstract protected String[] ceph_hosts(int fh, long offset); - - /* - * Set the mtime and atime for a given path. - * Inputs: - * String path: The path to set the times for. - * long mtime: The mtime to set, in millis since epoch (-1 to not set). - * long atime: The atime to set, in millis since epoch (-1 to not set) - * Returns: 0 if successful, an error code otherwise. - */ - abstract protected int ceph_setTimes(String path, long mtime, long atime); - - /* - * Get the current position in a file (as a long) of a given filehandle. - * Returns: (long) current file position on success, or a - * negative error code on failure. - */ - abstract protected long ceph_getpos(int fh); - - /* - * Write the given buffer contents to the given filehandle. - * Inputs: - * int fh: The filehandle to write to. - * byte[] buffer: The buffer to write from - * int buffer_offset: The position in the buffer to write from - * int length: The number of (sequential) bytes to write. - * Returns: int, on success the number of bytes written, on failure - * a negative error code. - */ - abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); - - /* - * Reads into the given byte array from the current position. - * Inputs: - * int fh: the filehandle to read from - * byte[] buffer: the byte array to read into - * int buffer_offset: where in the buffer to start writing - * int length: how much to read. - * There'd better be enough space in the buffer to write all - * the data from the given offset! - * Returns: the number of bytes read on success (as an int), - * or an error code otherwise. */ - abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); - - /* - * Seeks to the given position in the given file. - * Inputs: - * int fh: The filehandle to seek in. - * long pos: The position to seek to. - * Returns: the new position (as a long) of the filehandle on success, - * or a negative error code on failure. */ - abstract protected long ceph_seek_from_start(int fh, long pos); -} diff --git a/src/client/hadoop/ceph/CephFaker.java b/src/client/hadoop/ceph/CephFaker.java deleted file mode 100644 index c598f536039..00000000000 --- a/src/client/hadoop/ceph/CephFaker.java +++ /dev/null @@ -1,483 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * This uses the local Filesystem but pretends to be communicating - * with a Ceph deployment, for unit testing the CephFileSystem. - */ - -package org.apache.hadoop.fs.ceph; - - -import java.net.URI; -import java.util.Hashtable; -import java.io.Closeable; -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; - - -class CephFaker extends CephFS { - private static final Log LOG = LogFactory.getLog(CephFaker.class); - FileSystem localFS; - String localPrefix; - int blockSize; - Configuration conf; - Hashtable<Integer, Object> files; - Hashtable<Integer, String> filenames; - int fileCount = 0; - boolean initialized = false; - - public CephFaker(Configuration con, Log log) { - conf = con; - files = new Hashtable<Integer, Object>(); - filenames = new Hashtable<Integer, String>(); - } - - protected boolean ceph_initializeClient(String args, int block_size) { - if (!initialized) { - // let's remember the default block_size - blockSize = block_size; - - /* for a real Ceph deployment, this starts up the client, - * sets debugging levels, etc. We just need to get the - * local FileSystem to use, and we'll ignore any - * command-line arguments. */ - try { - localFS = FileSystem.getLocal(conf); - localFS.initialize(URI.create("file://localhost"), conf); - localFS.setVerifyChecksum(false); - String testDir = conf.get("hadoop.tmp.dir"); - - localPrefix = localFS.getWorkingDirectory().toString(); - int testDirLoc = localPrefix.indexOf(testDir) - 1; - - if (-2 == testDirLoc) { - testDirLoc = localPrefix.length(); - } - localPrefix = localPrefix.substring(0, testDirLoc) + "/" - + conf.get("hadoop.tmp.dir"); - - localFS.setWorkingDirectory( - new Path(localPrefix + "/user/" + System.getProperty("user.name"))); - // I don't know why, but the unit tests expect the default - // working dir to be /user/username, so satisfy them! - // debug("localPrefix is " + localPrefix, INFO); - } catch (IOException e) { - return false; - } - initialized = true; - } - return true; - } - - protected String ceph_getcwd() { - return sanitize_path(localFS.getWorkingDirectory().toString()); - } - - protected boolean ceph_setcwd(String path) { - localFS.setWorkingDirectory(new Path(prepare_path(path))); - return true; - } - - // the caller is responsible for ensuring empty dirs - protected boolean ceph_rmdir(String pth) { - Path path = new Path(prepare_path(pth)); - boolean ret = false; - - try { - if (localFS.listStatus(path).length <= 1) { - ret = localFS.delete(path, true); - } - } catch (IOException e) {} - return ret; - } - - // this needs to work on (empty) directories too - protected boolean ceph_unlink(String path) { - path = prepare_path(path); - boolean ret = false; - - if (ceph_isdirectory(path)) { - ret = ceph_rmdir(path); - } else { - try { - ret = localFS.delete(new Path(path), false); - } catch (IOException e) {} - } - return ret; - } - - protected boolean ceph_rename(String oldName, String newName) { - oldName = prepare_path(oldName); - newName = prepare_path(newName); - try { - Path parent = new Path(newName).getParent(); - Path newPath = new Path(newName); - - if (localFS.exists(parent) && !localFS.exists(newPath)) { - return localFS.rename(new Path(oldName), newPath); - } - return false; - } catch (IOException e) { - return false; - } - } - - protected boolean ceph_exists(String path) { - path = prepare_path(path); - boolean ret = false; - - try { - ret = localFS.exists(new Path(path)); - } catch (IOException e) {} - return ret; - } - - protected long ceph_getblocksize(String path) { - path = prepare_path(path); - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - - return status.getBlockSize(); - } catch (FileNotFoundException e) { - return -CephFS.ENOENT; - } catch (IOException e) { - return -1; // just fail generically - } - } - - protected boolean ceph_isdirectory(String path) { - path = prepare_path(path); - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - - return status.isDir(); - } catch (IOException e) { - return false; - } - } - - protected boolean ceph_isfile(String path) { - path = prepare_path(path); - boolean ret = false; - - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - - ret = !status.isDir(); - } catch (Exception e) {} - return ret; - } - - protected String[] ceph_getdir(String path) { - path = prepare_path(path); - if (!ceph_isdirectory(path)) { - return null; - } - try { - FileStatus[] stats = localFS.listStatus(new Path(path)); - String[] names = new String[stats.length]; - String name; - - for (int i = 0; i < stats.length; ++i) { - name = stats[i].getPath().toString(); - names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1); - } - return names; - } catch (IOException e) {} - return null; - } - - protected int ceph_mkdirs(String path, int mode) { - path = prepare_path(path); - // debug("ceph_mkdirs on " + path, INFO); - try { - if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) { - return 0; - } - } catch (IOException e) {} - if (ceph_isdirectory(path)) { // apparently it already existed - return -EEXIST; - } else if (ceph_isfile(path)) { - return -ENOTDIR; - } - return -1; - } - - /* - * Unlike a real Ceph deployment, you can't do opens on a directory. - * Since that has unpredictable behavior and you shouldn't do it anyway, - * it's okay. - */ - protected int ceph_open_for_append(String path) { - path = prepare_path(path); - FSDataOutputStream stream; - - try { - stream = localFS.append(new Path(path)); - files.put(new Integer(fileCount), stream); - filenames.put(new Integer(fileCount), path); - return fileCount++; - } catch (IOException e) {} - return -1; // failure - } - - protected int ceph_open_for_read(String path) { - path = prepare_path(path); - FSDataInputStream stream; - - try { - stream = localFS.open(new Path(path)); - files.put(new Integer(fileCount), stream); - filenames.put(new Integer(fileCount), path); - LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path); - return fileCount++; - } catch (IOException e) {} - return -1; // failure - } - - protected int ceph_open_for_overwrite(String path, int mode) { - path = prepare_path(path); - FSDataOutputStream stream; - - try { - stream = localFS.create(new Path(path)); - files.put(new Integer(fileCount), stream); - filenames.put(new Integer(fileCount), path); - LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path); - return fileCount++; - } catch (IOException e) {} - return -1; // failure - } - - protected int ceph_close(int filehandle) { - LOG.info("ceph_close(filehandle " + filehandle + ")"); - try { - ((Closeable) files.get(new Integer(filehandle))).close(); - if (null == files.get(new Integer(filehandle))) { - return -ENOENT; // this isn't quite the right error code, - // but the important part is it's negative - } - return 0; // hurray, success - } catch (NullPointerException ne) { - LOG.warn("ceph_close caught NullPointerException!" + ne); - } // err, how? - catch (IOException ie) { - LOG.warn("ceph_close caught IOException!" + ie); - } - return -1; // failure - } - - protected boolean ceph_setPermission(String pth, int mode) { - pth = prepare_path(pth); - Path path = new Path(pth); - boolean ret = false; - - try { - localFS.setPermission(path, new FsPermission((short) mode)); - ret = true; - } catch (IOException e) {} - return ret; - } - - // rather than try and match a Ceph deployment's behavior exactly, - // just make bad things happen if they try and call methods after this - protected boolean ceph_kill_client() { - // debug("ceph_kill_client", INFO); - localFS.setWorkingDirectory(new Path(localPrefix)); - // debug("working dir is now " + localFS.getWorkingDirectory(), INFO); - try { - localFS.close(); - } catch (Exception e) {} - localFS = null; - files = null; - filenames = null; - return true; - } - - protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) { - pth = prepare_path(pth); - Path path = new Path(pth); - boolean ret = false; - - try { - FileStatus status = localFS.getFileStatus(path); - - fill.size = status.getLen(); - fill.is_dir = status.isDir(); - fill.block_size = status.getBlockSize(); - fill.mod_time = status.getModificationTime(); - fill.access_time = status.getAccessTime(); - fill.mode = status.getPermission().toShort(); - ret = true; - } catch (IOException e) {} - return ret; - } - - protected int ceph_replication(String path) { - path = prepare_path(path); - int ret = -1; // -1 for failure - - try { - ret = localFS.getFileStatus(new Path(path)).getReplication(); - } catch (IOException e) {} - return ret; - } - - protected String[] ceph_hosts(int fh, long offset) { - String[] ret = null; - - try { - BlockLocation[] locs = localFS.getFileBlockLocations( - localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))), - offset, 1); - - ret = locs[0].getNames(); - } catch (IOException e) {} catch (NullPointerException f) {} - return ret; - } - - protected int ceph_setTimes(String pth, long mtime, long atime) { - pth = prepare_path(pth); - Path path = new Path(pth); - int ret = -1; // generic fail - - try { - localFS.setTimes(path, mtime, atime); - ret = 0; - } catch (IOException e) {} - return ret; - } - - protected long ceph_getpos(int fh) { - long ret = -1; // generic fail - - try { - Object stream = files.get(new Integer(fh)); - - if (stream instanceof FSDataInputStream) { - ret = ((FSDataInputStream) stream).getPos(); - } else if (stream instanceof FSDataOutputStream) { - ret = ((FSDataOutputStream) stream).getPos(); - } - } catch (IOException e) {} catch (NullPointerException f) {} - return ret; - } - - protected int ceph_write(int fh, byte[] buffer, - int buffer_offset, int length) { - LOG.info( - "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:" - + length); - long ret = -1; // generic fail - - try { - FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh)); - - LOG.info("ceph_write got outputstream"); - long startPos = os.getPos(); - - os.write(buffer, buffer_offset, length); - ret = os.getPos() - startPos; - } catch (IOException e) { - LOG.warn("ceph_write caught IOException!"); - } catch (NullPointerException f) { - LOG.warn("ceph_write caught NullPointerException!"); - } - return (int) ret; - } - - protected int ceph_read(int fh, byte[] buffer, - int buffer_offset, int length) { - long ret = -1; // generic fail - - try { - FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh)); - long startPos = is.getPos(); - - is.read(buffer, buffer_offset, length); - ret = is.getPos() - startPos; - } catch (IOException e) {} catch (NullPointerException f) {} - return (int) ret; - } - - protected long ceph_seek_from_start(int fh, long pos) { - LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")"); - long ret = -1; // generic fail - - try { - LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh))); - if (null == files.get(new Integer(fh))) { - LOG.warn("ceph_seek_from_start: is is null!"); - } - FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh)); - - LOG.info("ceph_seek_from_start retrieved is!"); - is.seek(pos); - ret = is.getPos(); - } catch (IOException e) { - LOG.warn("ceph_seek_from_start caught IOException!"); - } catch (NullPointerException f) { - LOG.warn("ceph_seek_from_start caught NullPointerException!"); - } - return (int) ret; - } - - /* - * We need to remove the localFS file prefix before returning to Ceph - */ - private String sanitize_path(String path) { - // debug("sanitize_path(" + path + ")", INFO); - /* if (path.startsWith("file:")) - path = path.substring("file:".length()); */ - if (path.startsWith(localPrefix)) { - path = path.substring(localPrefix.length()); - if (path.length() == 0) { // it was a root path - path = "/"; - } - } - // debug("sanitize_path returning " + path, INFO); - return path; - } - - /* - * If it's an absolute path we need to shove the - * test dir onto the front as a prefix. - */ - private String prepare_path(String path) { - // debug("prepare_path(" + path + ")", INFO); - if (path.startsWith("/")) { - path = localPrefix + path; - } else if (path.equals("..")) { - if (ceph_getcwd().equals("/")) { - path = "."; - } // you can't go up past root! - } - // debug("prepare_path returning" + path, INFO); - return path; - } -} diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java deleted file mode 100644 index 95f22238b4d..00000000000 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ /dev/null @@ -1,804 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Implements the Hadoop FS interfaces to allow applications to store - * files in Ceph. - */ -package org.apache.hadoop.fs.ceph; - - -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.OutputStream; -import java.net.URI; -import java.net.InetAddress; -import java.util.EnumSet; -import java.lang.Math; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.net.DNS; - - -/** - * <p> - * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>. - * This will not start a Ceph instance; one must already be running. - * </p> - * Configuration of the CephFileSystem is handled via a few Hadoop - * Configuration properties: <br> - * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br> - * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are - * located in. This assumes Hadoop is being run on a linux-style machine - * with names like libcephfs.so. - * fs.ceph.commandLine -- if you prefer you can fill in this property - * just as you would when starting Ceph up from the command line. Specific - * properties override any configuration specified here. - * <p> - * You can also enable debugging of the CephFileSystem and Ceph itself: <br> - * fs.ceph.debug -- if 'true' will print out method enter/exit messages, - * plus a little more. - * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging - * from the respective Ceph system of at least that importance. - */ -public class CephFileSystem extends FileSystem { - private static final Log LOG = LogFactory.getLog(CephFileSystem.class); - private URI uri; - - private Path workingDir; - private final Path root; - private CephFS ceph = null; - - private static String CEPH_NAMESERVER; - private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver"; - private static final String CEPH_NAMESERVER_DEFAULT = "localhost"; - - /** - * Create a new CephFileSystem. - */ - public CephFileSystem() { - root = new Path("/"); - } - - /** - * Used for testing purposes, this constructor - * sets the given CephFS instead of defaulting to a - * CephTalker (with its assumed real Ceph instance to talk to). - */ - public CephFileSystem(CephFS ceph_fs) { - super(); - root = new Path("/"); - ceph = ceph_fs; - } - - /** - * Lets you get the URI of this CephFileSystem. - * @return the URI. - */ - public URI getUri() { - LOG.debug("getUri:exit with return " + uri); - return uri; - } - - /** - * Should be called after constructing a CephFileSystem but before calling - * any other methods. - * Starts up the connection to Ceph, reads in configuraton options, etc. - * @param uri The URI for this filesystem. - * @param conf The Hadoop Configuration to retrieve properties from. - * @throws IOException if necessary properties are unset. - */ - @Override - public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - setConf(conf); - this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - if (ceph == null) { - ceph = new CephTalker(conf, LOG); - } - - CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT); - - // build up the arguments for Ceph - String arguments = "CephFSInterface"; - - arguments += conf.get("fs.ceph.commandLine", ""); - if (conf.get("fs.ceph.clientDebug") != null) { - arguments += " --debug_client "; - arguments += conf.get("fs.ceph.clientDebug"); - } - if (conf.get("fs.ceph.messengerDebug") != null) { - arguments += " --debug_ms "; - arguments += conf.get("fs.ceph.messengerDebug"); - } - if (conf.get("fs.ceph.monAddr") != null) { - arguments += " -m "; - arguments += conf.get("fs.ceph.monAddr"); - } - arguments += " --client-readahead-max-periods=" - + conf.get("fs.ceph.readahead", "1"); - // make sure they gave us a ceph monitor address or conf file - LOG.info("initialize:Ceph initialization arguments: " + arguments); - if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1) - && (arguments.indexOf("-c") == -1)) { - LOG.fatal("initialize:You need to specify a Ceph monitor address."); - throw new IOException( - "You must specify a Ceph monitor address or config file!"); - } - // Initialize the client - if (!ceph.ceph_initializeClient(arguments, - conf.getInt("fs.ceph.blockSize", 1 << 26))) { - LOG.fatal("initialize:Ceph initialization failed!"); - throw new IOException("Ceph initialization failed!"); - } - LOG.info("initialize:Ceph initialized client. Setting cwd to /"); - ceph.ceph_setcwd("/"); - LOG.debug("initialize:exit"); - - this.workingDir = getHomeDirectory(); - } - - /** - * Close down the CephFileSystem. Runs the base-class close method - * and then kills the Ceph client itself. - */ - @Override - public void close() throws IOException { - LOG.debug("close:enter"); - super.close(); // this method does stuff, make sure it's run! - LOG.trace("close: Calling ceph_kill_client from Java"); - ceph.ceph_kill_client(); - LOG.debug("close:exit"); - } - - /** - * Get an FSDataOutputStream to append onto a file. - * @param file The File you want to append onto - * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like. - * @param progress The Progressable to report progress to. - * Reporting is limited but exists. - * @return An FSDataOutputStream that connects to the file on Ceph. - * @throws IOException If the file cannot be found or appended to. - */ - public FSDataOutputStream append(Path file, int bufferSize, - Progressable progress) throws IOException { - LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize); - Path abs_path = makeAbsolute(file); - - if (progress != null) { - progress.progress(); - } - LOG.trace("append: Entering ceph_open_for_append from Java"); - int fd = ceph.ceph_open_for_append(getCephPath(abs_path)); - - LOG.trace("append: Returned to Java"); - if (progress != null) { - progress.progress(); - } - if (fd < 0) { // error in open - throw new IOException( - "append: Open for append failed on path \"" + abs_path.toString() - + "\""); - } - CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd, - bufferSize); - - LOG.debug("append:exit"); - return new FSDataOutputStream(cephOStream, statistics); - } - - /** - * Get the current working directory for the given file system - * @return the directory Path - */ - public Path getWorkingDirectory() { - return workingDir; - } - - /** - * Set the current working directory for the given file system. All relative - * paths will be resolved relative to it. - * - * @param dir The directory to change to. - */ - @Override - public void setWorkingDirectory(Path dir) { - workingDir = makeAbsolute(dir); - } - - /** - * Return only the path component from a potentially fully qualified path. - */ - private String getCephPath(Path path) { - if (!path.isAbsolute()) { - throw new IllegalArgumentException("Path must be absolute: " + path); - } - return path.toUri().getPath(); - } - - /** - * Check if a path exists. - * Overriden because it's moderately faster than the generic implementation. - * @param path The file to check existence on. - * @return true if the file exists, false otherwise. - */ - @Override - public boolean exists(Path path) throws IOException { - LOG.debug("exists:enter with path " + path); - boolean result; - Path abs_path = makeAbsolute(path); - - if (abs_path.equals(root)) { - result = true; - } else { - LOG.trace( - "exists:Calling ceph_exists from Java on path " + abs_path.toString()); - result = ceph.ceph_exists(getCephPath(abs_path)); - LOG.trace("exists:Returned from ceph_exists to Java"); - } - LOG.debug("exists:exit with value " + result); - return result; - } - - /** - * Create a directory and any nonexistent parents. Any portion - * of the directory tree can exist without error. - * @param path The directory path to create - * @param perms The permissions to apply to the created directories. - * @return true if successful, false otherwise - * @throws IOException if the path is a child of a file. - */ - @Override - public boolean mkdirs(Path path, FsPermission perms) throws IOException { - LOG.debug("mkdirs:enter with path " + path); - Path abs_path = makeAbsolute(path); - - LOG.trace("mkdirs:calling ceph_mkdirs from Java"); - int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort()); - - if (result != 0) { - LOG.warn( - "mkdirs: make directory " + abs_path + "Failing with result " + result); - if (-ceph.ENOTDIR == result) { - throw new IOException("Parent path is not a directory"); - } - return false; - } else { - LOG.debug("mkdirs:exiting succesfully"); - return true; - } - } - - /** - * Check if a path is a file. This is moderately faster than the - * generic implementation. - * @param path The path to check. - * @return true if the path is definitely a file, false otherwise. - */ - @Override - public boolean isFile(Path path) throws IOException { - LOG.debug("isFile:enter with path " + path); - Path abs_path = makeAbsolute(path); - boolean result; - - if (abs_path.equals(root)) { - result = false; - } else { - LOG.trace("isFile:entering ceph_isfile from Java"); - result = ceph.ceph_isfile(getCephPath(abs_path)); - } - LOG.debug("isFile:exit with result " + result); - return result; - } - - /** - * Get stat information on a file. This does not fill owner or group, as - * Ceph's support for these is a bit different than HDFS'. - * @param path The path to stat. - * @return FileStatus object containing the stat information. - * @throws FileNotFoundException if the path could not be resolved. - */ - public FileStatus getFileStatus(Path path) throws IOException { - LOG.debug("getFileStatus:enter with path " + path); - Path abs_path = makeAbsolute(path); - // sadly, Ceph doesn't really do uids/gids just yet, but - // everything else is filled - FileStatus status; - Stat lstat = new Stat(); - - LOG.trace("getFileStatus: calling ceph_stat from Java"); - if (ceph.ceph_stat(getCephPath(abs_path), lstat)) { - status = new FileStatus(lstat.size, lstat.is_dir, - ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size, - lstat.mod_time, lstat.access_time, - new FsPermission((short) lstat.mode), System.getProperty("user.name"), null, - path.makeQualified(this)); - } else { // fail out - throw new FileNotFoundException( - "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path - + " does not exist or could not be accessed"); - } - - LOG.debug("getFileStatus:exit"); - return status; - } - - /** - * Get the FileStatus for each listing in a directory. - * @param path The directory to get listings from. - * @return FileStatus[] containing one FileStatus for each directory listing; - * null if path does not exist. - */ - public FileStatus[] listStatus(Path path) throws IOException { - LOG.debug("listStatus:enter with path " + path); - Path abs_path = makeAbsolute(path); - Path[] paths = listPaths(abs_path); - - if (paths != null) { - FileStatus[] statuses = new FileStatus[paths.length]; - - for (int i = 0; i < paths.length; ++i) { - statuses[i] = getFileStatus(paths[i]); - } - LOG.debug("listStatus:exit"); - return statuses; - } - - if (isFile(path)) { - return new FileStatus[] { getFileStatus(path) }; - } - - return null; - } - - @Override - public void setPermission(Path p, FsPermission permission) throws IOException { - LOG.debug( - "setPermission:enter with path " + p + " and permissions " + permission); - Path abs_path = makeAbsolute(p); - - LOG.trace("setPermission:calling ceph_setpermission from Java"); - ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort()); - LOG.debug("setPermission:exit"); - } - - /** - * Set access/modification times of a file. - * @param p The path - * @param mtime Set modification time in number of millis since Jan 1, 1970. - * @param atime Set access time in number of millis since Jan 1, 1970. - */ - @Override - public void setTimes(Path p, long mtime, long atime) throws IOException { - LOG.debug( - "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime); - Path abs_path = makeAbsolute(p); - - LOG.trace("setTimes:calling ceph_setTimes from Java"); - int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime); - - if (r < 0) { - throw new IOException( - "Failed to set times on path " + abs_path.toString() + " Error code: " - + r); - } - LOG.debug("setTimes:exit"); - } - - /** - * Create a new file and open an FSDataOutputStream that's connected to it. - * @param path The file to create. - * @param permission The permissions to apply to the file. - * @param overwrite If true, overwrite any existing file with - * this name; otherwise don't. - * @param bufferSize Ceph does internal buffering, but you can buffer - * in the Java code too if you like. - * @param replication Ignored by Ceph. This can be - * configured via Ceph configuration. - * @param blockSize Ignored by Ceph. You can set client-wide block sizes - * via the fs.ceph.blockSize param if you like. - * @param progress A Progressable to report back to. - * Reporting is limited but exists. - * @return An FSDataOutputStream pointing to the created file. - * @throws IOException if the path is an - * existing directory, or the path exists but overwrite is false, or there is a - * failure in attempting to open for append with Ceph. - */ - public FSDataOutputStream create(Path path, - FsPermission permission, - boolean overwrite, - int bufferSize, - short replication, - long blockSize, - Progressable progress) throws IOException { - LOG.debug("create:enter with path " + path); - Path abs_path = makeAbsolute(path); - - if (progress != null) { - progress.progress(); - } - // We ignore replication since that's not configurable here, and - // progress reporting is quite limited. - // Required semantics: if the file exists, overwrite if 'overwrite' is set; - // otherwise, throw an exception - - // Step 1: existence test - boolean exists = exists(abs_path); - - if (exists) { - if (getFileStatus(abs_path).isDir()) { - throw new IOException( - "create: Cannot overwrite existing directory \"" + path.toString() - + "\" with a file"); - } - if (!overwrite) { - throw new IOException( - "createRaw: Cannot open existing file \"" + abs_path.toString() - + "\" for writing without overwrite flag"); - } - } - - if (progress != null) { - progress.progress(); - } - - // Step 2: create any nonexistent directories in the path - if (!exists) { - Path parent = abs_path.getParent(); - - if (parent != null) { // if parent is root, we're done - int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort()); - - if (!(r == 0 || r == -ceph.EEXIST)) { - throw new IOException("Error creating parent directory; code: " + r); - } - } - if (progress != null) { - progress.progress(); - } - } - // Step 3: open the file - LOG.trace("calling ceph_open_for_overwrite from Java"); - int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path), - (int) permission.toShort()); - - if (progress != null) { - progress.progress(); - } - LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh); - if (fh < 0) { - throw new IOException( - "create: Open for overwrite failed on path \"" + path.toString() - + "\""); - } - - // Step 4: create the stream - OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh, - bufferSize); - - LOG.debug("create:exit"); - return new FSDataOutputStream(cephOStream, statistics); - } - - /** - * Open a Ceph file and attach the file handle to an FSDataInputStream. - * @param path The file to open - * @param bufferSize Ceph does internal buffering; but you can buffer in - * the Java code too if you like. - * @return FSDataInputStream reading from the given path. - * @throws IOException if the path DNE or is a - * directory, or there is an error getting data to set up the FSDataInputStream. - */ - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - LOG.debug("open:enter with path " + path); - Path abs_path = makeAbsolute(path); - - int fh = ceph.ceph_open_for_read(getCephPath(abs_path)); - - if (fh < 0) { // uh-oh, something's bad! - if (fh == -ceph.ENOENT) { // well that was a stupid open - throw new IOException( - "open: absolute path \"" + abs_path.toString() - + "\" does not exist"); - } else { // hrm...the file exists but we can't open it :( - throw new IOException("open: Failed to open file " + abs_path.toString()); - } - } - - if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories - // but that doesn't mean you should in Hadoop! - ceph.ceph_close(fh); - throw new IOException( - "open: absolute path \"" + abs_path.toString() + "\" is a directory!"); - } - Stat lstat = new Stat(); - - LOG.trace("open:calling ceph_stat from Java"); - ceph.ceph_stat(getCephPath(abs_path), lstat); - LOG.trace("open:returned to Java"); - long size = lstat.size; - - if (size < 0) { - throw new IOException( - "Failed to get file size for file " + abs_path.toString() - + " but succeeded in opening file. Something bizarre is going on."); - } - FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size, - bufferSize); - - LOG.debug("open:exit"); - return new FSDataInputStream(cephIStream); - } - - /** - * Rename a file or directory. - * @param src The current path of the file/directory - * @param dst The new name for the path. - * @return true if the rename succeeded, false otherwise. - */ - @Override - public boolean rename(Path src, Path dst) throws IOException { - LOG.debug("rename:enter with src:" + src + " and dest:" + dst); - Path abs_src = makeAbsolute(src); - Path abs_dst = makeAbsolute(dst); - - LOG.trace("calling ceph_rename from Java"); - boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst)); - - if (!result) { - boolean isDir = false; - try { - isDir = getFileStatus(abs_dst).isDir(); - } catch (FileNotFoundException e) {} - if (isDir) { // move the srcdir into destdir - LOG.debug("ceph_rename failed but dst is a directory!"); - Path new_dst = new Path(abs_dst, abs_src.getName()); - - result = rename(abs_src, new_dst); - LOG.debug( - "attempt to move " + abs_src.toString() + " to " - + new_dst.toString() + "has result:" + result); - } - } - LOG.debug("rename:exit with result: " + result); - return result; - } - - /* - * Attempt to convert an IP into its hostname - */ - private String[] ips2Hosts(String[] ips) { - ArrayList<String> hosts = new ArrayList<String>(); - for (String ip : ips) { - try { - String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER); - if (host.charAt(host.length()-1) == '.') { - host = host.substring(0, host.length()-1); - } - hosts.add(host); /* append */ - } catch (Exception e) { - LOG.error("reverseDns ["+ip+"] failed: "+ e); - } - } - return hosts.toArray(new String[hosts.size()]); - } - - /** - * Get a BlockLocation object for each block in a file. - * - * Note that this doesn't include port numbers in the name field as - * Ceph handles slow/down servers internally. This data should be used - * only for selecting which servers to run which jobs on. - * - * @param file A FileStatus object corresponding to the file you want locations for. - * @param start The offset of the first part of the file you are interested in. - * @param len The amount of the file past the offset you are interested in. - * @return A BlockLocation[] where each object corresponds to a block within - * the given range. - */ - @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { - Path abs_path = makeAbsolute(file.getPath()); - - int fh = ceph.ceph_open_for_read(getCephPath(abs_path)); - if (fh < 0) { - LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!"); - return null; - } - - long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path)); - BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)]; - - for (int i = 0; i < locations.length; ++i) { - long offset = start + i * blockSize; - long blockStart = start + i * blockSize - (start % blockSize); - String ips[] = ceph.ceph_hosts(fh, offset); - String hosts[] = ips2Hosts(ips); - locations[i] = new BlockLocation(null, hosts, blockStart, blockSize); - LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]); - } - - ceph.ceph_close(fh); - return locations; - } - - @Deprecated - public boolean delete(Path path) throws IOException { - return delete(path, false); - } - - /** - * Delete the given path, and optionally its children. - * @param path the path to delete. - * @param recursive If the path is a non-empty directory and this is false, - * delete will throw an IOException. If path is a file this is ignored. - * @return true if the delete succeeded, false otherwise (including if - * path doesn't exist). - * @throws IOException if you attempt to non-recursively delete a directory, - * or you attempt to delete the root directory. - */ - public boolean delete(Path path, boolean recursive) throws IOException { - LOG.debug("delete:enter with path " + path + " and recursive=" + recursive); - Path abs_path = makeAbsolute(path); - - // sanity check - if (abs_path.equals(root)) { - throw new IOException("Error: deleting the root directory is a Bad Idea."); - } - if (!exists(abs_path)) { - return false; - } - - // if the path is a file, try to delete it. - if (isFile(abs_path)) { - LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path); - boolean result = ceph.ceph_unlink(getCephPath(abs_path)); - - if (!result) { - LOG.error( - "delete: failed to delete file \"" + abs_path.toString() + "\"."); - } - LOG.debug("delete:exit with success=" + result); - return result; - } - - /* The path is a directory, so recursively try to delete its contents, - and then delete the directory. */ - // get the entries; listPaths will remove . and .. for us - Path[] contents = listPaths(abs_path); - - if (contents == null) { - LOG.error( - "delete: Failed to read contents of directory \"" - + abs_path.toString() + "\" while trying to delete it, BAILING"); - return false; - } - if (!recursive && contents.length > 0) { - throw new IOException("Directories must be deleted recursively!"); - } - // delete the entries - LOG.debug("delete: recursively calling delete on contents of " + abs_path); - for (Path p : contents) { - if (!delete(p, true)) { - LOG.error( - "delete: Failed to delete file \"" + p.toString() - + "\" while recursively deleting \"" + abs_path.toString() - + "\", BAILING"); - return false; - } - } - // if we've come this far it's a now-empty directory, so delete it! - boolean result = ceph.ceph_rmdir(getCephPath(abs_path)); - - if (!result) { - LOG.error( - "delete: failed to delete \"" + abs_path.toString() + "\", BAILING"); - } - LOG.debug("delete:exit"); - return result; - } - - /** - * Returns the default replication value of 1. This may - * NOT be the actual value, as replication is controlled - * by a separate Ceph configuration. - */ - @Override - public short getDefaultReplication() { - return 1; - } - - /** - * Get the default block size. - * @return the default block size, in bytes, as a long. - */ - @Override - public long getDefaultBlockSize() { - return getConf().getInt("fs.ceph.blockSize", 1 << 26); - } - - /** - * Adds the working directory to path if path is not already - * an absolute path. The URI scheme is not removed here. It - * is removed only when users (e.g. ceph native calls) need - * the path-only portion. - */ - private Path makeAbsolute(Path path) { - if (path.isAbsolute()) { - return path; - } - return new Path(workingDir, path); - } - - private Path[] listPaths(Path path) throws IOException { - LOG.debug("listPaths:enter with path " + path); - String dirlist[]; - - Path abs_path = makeAbsolute(path); - - // If it's a directory, get the listing. Otherwise, complain and give up. - LOG.debug("calling ceph_getdir from Java with path " + abs_path); - dirlist = ceph.ceph_getdir(getCephPath(abs_path)); - LOG.debug("returning from ceph_getdir to Java"); - - if (dirlist == null) { - return null; - } - - // convert the strings to Paths - Path[] paths = new Path[dirlist.length]; - - for (int i = 0; i < dirlist.length; ++i) { - LOG.trace( - "Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" - + dirlist[i] + "\""); - // convert each listing to an absolute path - Path raw_path = new Path(dirlist[i]); - - if (raw_path.isAbsolute()) { - paths[i] = raw_path; - } else { - paths[i] = new Path(abs_path, raw_path); - } - } - LOG.debug("listPaths:exit"); - return paths; - } - - static class Stat { - public long size; - public boolean is_dir; - public long block_size; - public long mod_time; - public long access_time; - public int mode; - - public Stat() {} - } -} diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java deleted file mode 100644 index d9668d031ba..00000000000 --- a/src/client/hadoop/ceph/CephInputStream.java +++ /dev/null @@ -1,254 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Implements the Hadoop FS interfaces to allow applications to store - * files in Ceph. - */ -package org.apache.hadoop.fs.ceph; - - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSInputStream; - - -/** - * <p> - * An {@link FSInputStream} for a CephFileSystem and corresponding - * Ceph instance. - */ -public class CephInputStream extends FSInputStream { - private static final Log LOG = LogFactory.getLog(CephInputStream.class); - private boolean closed; - - private int fileHandle; - - private long fileLength; - - private CephFS ceph; - - private byte[] buffer; - private int bufPos = 0; - private int bufValid = 0; - private long cephPos = 0; - - /** - * Create a new CephInputStream. - * @param conf The system configuration. Unused. - * @param fh The filehandle provided by Ceph to reference. - * @param flength The current length of the file. If the length changes - * you will need to close and re-open it to access the new data. - */ - public CephInputStream(Configuration conf, CephFS cephfs, - int fh, long flength, int bufferSize) { - // Whoever's calling the constructor is responsible for doing the actual ceph_open - // call and providing the file handle. - fileLength = flength; - fileHandle = fh; - closed = false; - ceph = cephfs; - buffer = new byte[bufferSize]; - LOG.debug( - "CephInputStream constructor: initializing stream with fh " + fh - + " and file length " + flength); - - } - - /** Ceph likes things to be closed before it shuts down, - * so closing the IOStream stuff voluntarily in a finalizer is good - */ - protected void finalize() throws Throwable { - try { - if (!closed) { - close(); - } - } finally { - super.finalize(); - } - } - - private synchronized boolean fillBuffer() throws IOException { - bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length); - bufPos = 0; - if (bufValid < 0) { - int err = bufValid; - - bufValid = 0; - // attempt to reset to old position. If it fails, too bad. - ceph.ceph_seek_from_start(fileHandle, cephPos); - throw new IOException("Failed to fill read buffer! Error code:" + err); - } - cephPos += bufValid; - return (bufValid != 0); - } - - /* - * Get the current position of the stream. - */ - public synchronized long getPos() throws IOException { - return cephPos - bufValid + bufPos; - } - - /** - * Find the number of bytes remaining in the file. - */ - @Override - public synchronized int available() throws IOException { - return (int) (fileLength - getPos()); - } - - public synchronized void seek(long targetPos) throws IOException { - LOG.trace( - "CephInputStream.seek: Seeking to position " + targetPos + " on fd " - + fileHandle); - if (targetPos > fileLength) { - throw new IOException( - "CephInputStream.seek: failed seek to position " + targetPos - + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); - } - long oldPos = cephPos; - - cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); - bufValid = 0; - bufPos = 0; - if (cephPos < 0) { - cephPos = oldPos; - throw new IOException("Ceph failed to seek to new position!"); - } - } - - /** - * Failovers are handled by the Ceph code at a very low level; - * if there are issues that can be solved by changing sources - * they'll be dealt with before anybody even tries to call this method! - * @return false. - */ - public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** - * Read a byte from the file. - * @return the next byte. - */ - @Override - public synchronized int read() throws IOException { - LOG.trace( - "CephInputStream.read: Reading a single byte from fd " + fileHandle - + " by calling general read function"); - - byte result[] = new byte[1]; - - if (getPos() >= fileLength) { - return -1; - } - if (-1 == read(result, 0, 1)) { - return -1; - } - if (result[0] < 0) { - return 256 + (int) result[0]; - } else { - return result[0]; - } - } - - /** - * Read a specified number of bytes from the file into a byte[]. - * @param buf the byte array to read into. - * @param off the offset to start at in the file - * @param len the number of bytes to read - * @return 0 if successful, otherwise an error code. - * @throws IOException on bad input. - */ - @Override - public synchronized int read(byte buf[], int off, int len) - throws IOException { - LOG.trace( - "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); - - if (closed) { - throw new IOException( - "CephInputStream.read: cannot read " + len + " bytes from fd " - + fileHandle + ": stream closed"); - } - - // ensure we're not past the end of the file - if (getPos() >= fileLength) { - LOG.debug( - "CephInputStream.read: cannot read " + len + " bytes from fd " - + fileHandle + ": current position is " + getPos() - + " and file length is " + fileLength); - - return -1; - } - - int totalRead = 0; - int initialLen = len; - int read; - - do { - read = Math.min(len, bufValid - bufPos); - try { - System.arraycopy(buffer, bufPos, buf, off, read); - } catch (IndexOutOfBoundsException ie) { - throw new IOException( - "CephInputStream.read: Indices out of bounds:" + "read length is " - + len + ", buffer offset is " + off + ", and buffer size is " - + buf.length); - } catch (ArrayStoreException ae) { - throw new IOException( - "Uh-oh, CephInputStream failed to do an array" - + "copy due to type mismatch..."); - } catch (NullPointerException ne) { - throw new IOException( - "CephInputStream.read: cannot read " + len + "bytes from fd:" - + fileHandle + ": buf is null"); - } - bufPos += read; - len -= read; - off += read; - totalRead += read; - } while (len > 0 && fillBuffer()); - - LOG.trace( - "CephInputStream.read: Reading " + initialLen + " bytes from fd " - + fileHandle + ": succeeded in reading " + totalRead + " bytes"); - return totalRead; - } - - /** - * Close the CephInputStream and release the associated filehandle. - */ - @Override - public void close() throws IOException { - LOG.trace("CephOutputStream.close:enter"); - if (!closed) { - int result = ceph.ceph_close(fileHandle); - - closed = true; - if (result != 0) { - throw new IOException( - "Close somehow failed!" - + "Don't try and use this stream again, though"); - } - LOG.trace("CephOutputStream.close:exit"); - } - } -} diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java deleted file mode 100644 index 4c50f88467d..00000000000 --- a/src/client/hadoop/ceph/CephOutputStream.java +++ /dev/null @@ -1,219 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Implements the Hadoop FS interfaces to allow applications to store - * files in Ceph. - */ - -package org.apache.hadoop.fs.ceph; - - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.Progressable; - - -/** - * <p> - * An {@link OutputStream} for a CephFileSystem and corresponding - * Ceph instance. - */ -public class CephOutputStream extends OutputStream { - private static final Log LOG = LogFactory.getLog(CephOutputStream.class); - private boolean closed; - - private CephFS ceph; - - private int fileHandle; - - private byte[] buffer; - private int bufUsed = 0; - - /** - * Construct the CephOutputStream. - * @param conf The FileSystem configuration. - * @param fh The Ceph filehandle to connect to. - */ - public CephOutputStream(Configuration conf, CephFS cephfs, - int fh, int bufferSize) { - ceph = cephfs; - fileHandle = fh; - closed = false; - buffer = new byte[bufferSize]; - } - - /** Ceph likes things to be closed before it shuts down, - *so closing the IOStream stuff voluntarily is good - */ - protected void finalize() throws Throwable { - try { - if (!closed) { - close(); - } - } finally { - super.finalize(); - } - } - - /** - * Get the current position in the file. - * @return The file offset in bytes. - */ - public long getPos() throws IOException { - return ceph.ceph_getpos(fileHandle); - } - - /** - * Write a byte. - * @param b The byte to write. - * @throws IOException If you have closed the CephOutputStream or the - * write fails. - */ - @Override - public synchronized void write(int b) throws IOException { - LOG.trace( - "CephOutputStream.write: writing a single byte to fd " + fileHandle); - - if (closed) { - throw new IOException( - "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle - + ": stream closed"); - } - // Stick the byte in a buffer and write it - byte buf[] = new byte[1]; - - buf[0] = (byte) b; - write(buf, 0, 1); - return; - } - - /** - * Write a byte buffer into the Ceph file. - * @param buf the byte array to write from - * @param off the position in the file to start writing at. - * @param len The number of bytes to actually write. - * @throws IOException if you have closed the CephOutputStream, or - * if buf is null or off + len > buf.length, or - * if the write fails due to a Ceph error. - */ - @Override - public synchronized void write(byte buf[], int off, int len) throws IOException { - LOG.trace( - "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle); - // make sure stream is open - if (closed) { - throw new IOException( - "CephOutputStream.write: cannot write " + len + "bytes to fd " - + fileHandle + ": stream closed"); - } - - int result; - int write; - - while (len > 0) { - write = Math.min(len, buffer.length - bufUsed); - try { - System.arraycopy(buf, off, buffer, bufUsed, write); - } catch (IndexOutOfBoundsException ie) { - throw new IOException( - "CephOutputStream.write: Indices out of bounds: " - + "write length is " + len + ", buffer offset is " + off - + ", and buffer size is " + buf.length); - } catch (ArrayStoreException ae) { - throw new IOException( - "Uh-oh, CephOutputStream failed to do an array" - + " copy due to type mismatch..."); - } catch (NullPointerException ne) { - throw new IOException( - "CephOutputStream.write: cannot write " + len + "bytes to fd " - + fileHandle + ": buffer is null"); - } - bufUsed += write; - len -= write; - off += write; - if (bufUsed == buffer.length) { - result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); - if (result < 0) { - throw new IOException( - "CephOutputStream.write: Buffered write of " + bufUsed - + " bytes failed!"); - } - if (result != bufUsed) { - throw new IOException( - "CephOutputStream.write: Wrote only " + result + " bytes of " - + bufUsed + " in buffer! Data may be lost or written" - + " twice to Ceph!"); - } - bufUsed = 0; - } - - } - return; - } - - /** - * Flush the buffered data. - * @throws IOException if you've closed the stream or the write fails. - */ - @Override - public synchronized void flush() throws IOException { - if (!closed) { - if (bufUsed == 0) { - return; - } - int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); - - if (result < 0) { - throw new IOException( - "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " - + fileHandle + " failed"); - } - if (result != bufUsed) { - throw new IOException( - "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " - + fileHandle + "was incomplete: only " + result + " of " + bufUsed - + " bytes were written."); - } - bufUsed = 0; - return; - } - } - - /** - * Close the CephOutputStream. - * @throws IOException if Ceph somehow returns an error. In current code it can't. - */ - @Override - public synchronized void close() throws IOException { - LOG.trace("CephOutputStream.close:enter"); - if (!closed) { - flush(); - int result = ceph.ceph_close(fileHandle); - - if (result != 0) { - throw new IOException("Close failed!"); - } - - closed = true; - LOG.trace("CephOutputStream.close:exit"); - } - } -} diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java deleted file mode 100644 index 569652fdd0b..00000000000 --- a/src/client/hadoop/ceph/CephTalker.java +++ /dev/null @@ -1,91 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Wraps a number of native function calls to communicate with the Ceph - * filesystem. - */ -package org.apache.hadoop.fs.ceph; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.commons.logging.Log; - - -class CephTalker extends CephFS { - // JNI doesn't give us any way to store pointers, so use a long. - // Here we're assuming pointers aren't longer than 8 bytes. - long cluster; - - // we write a constructor so we can load the libraries - public CephTalker(Configuration conf, Log log) { - System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so"); - System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so"); - cluster = 0; - } - - protected native boolean ceph_initializeClient(String arguments, int block_size); - - protected native String ceph_getcwd(); - - protected native boolean ceph_setcwd(String path); - - protected native boolean ceph_rmdir(String path); - - protected native boolean ceph_unlink(String path); - - protected native boolean ceph_rename(String old_path, String new_path); - - protected native boolean ceph_exists(String path); - - protected native long ceph_getblocksize(String path); - - protected native boolean ceph_isdirectory(String path); - - protected native boolean ceph_isfile(String path); - - protected native String[] ceph_getdir(String path); - - protected native int ceph_mkdirs(String path, int mode); - - protected native int ceph_open_for_append(String path); - - protected native int ceph_open_for_read(String path); - - protected native int ceph_open_for_overwrite(String path, int mode); - - protected native int ceph_close(int filehandle); - - protected native boolean ceph_setPermission(String path, int mode); - - protected native boolean ceph_kill_client(); - - protected native boolean ceph_stat(String path, CephFileSystem.Stat fill); - - protected native int ceph_replication(String Path); - - protected native String[] ceph_hosts(int fh, long offset); - - protected native int ceph_setTimes(String path, long mtime, long atime); - - protected native long ceph_getpos(int fh); - - protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); - - protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); - - protected native long ceph_seek_from_start(int fh, long pos); -} diff --git a/src/client/hadoop/ceph/LICENSE b/src/client/hadoop/ceph/LICENSE deleted file mode 100644 index 7a0decda573..00000000000 --- a/src/client/hadoop/ceph/LICENSE +++ /dev/null @@ -1,4 +0,0 @@ -Unlike the rest of the code in this repository, this -directory (src/client/hadoop) is licensed under the Apache License 2.0. This -is for the obvious reason that we want to integrate it into the Apache Hadoop -project.
\ No newline at end of file diff --git a/src/client/hadoop/ceph/TestCeph.java b/src/client/hadoop/ceph/TestCeph.java deleted file mode 100644 index e46b0eed3a1..00000000000 --- a/src/client/hadoop/ceph/TestCeph.java +++ /dev/null @@ -1,45 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Unit tests for the CephFileSystem API implementation. - */ - -package org.apache.hadoop.fs.ceph; - - -import java.io.IOException; -import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystemContractBaseTest; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - - -public class TestCeph extends FileSystemContractBaseTest { - - @Override - protected void setUp() throws IOException { - Configuration conf = new Configuration(); - CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG); - CephFileSystem cephfs = new CephFileSystem(cephfaker); - - cephfs.initialize(URI.create("ceph://null"), conf); - fs = cephfs; - } -} diff --git a/src/client/hadoop/ceph/package.html b/src/client/hadoop/ceph/package.html deleted file mode 100644 index 8167b1dde92..00000000000 --- a/src/client/hadoop/ceph/package.html +++ /dev/null @@ -1,101 +0,0 @@ -<html> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<head></head> -<body> -<h1>A client for the Ceph filesystem</h1> - -<h3>Introduction</h3> - -This page describes how to use <a href="http://ceph.newdream.net">Ceph</a> -as a backing store with Hadoop. This page assumes that you have downloaded -the Ceph software and installed necessary binaries as outlined in the Ceph -documentation. - -<h3>Steps</h3> -<ul> - <li>In the Hadoop conf directory edit core-site.xml, - adding the following (with appropriate substitutions). Note that - different nodes can connect to different monitors in the same cluster - without issue (the Ceph client will automatically redirect as necessary). -<pre> -<property> - <name>fs.default.name</name> - <value>ceph://null</value> -</property> - -<property> - <name>fs.ceph.monAddr</name> - <value><serverIP:port></value> - <description>The location of the Ceph monitor to connect to. - This should be an IP address, not a domain-based web address.</description> -</property> - -<property> - <name>fs.ceph.libDir</name> - <value>/usr/local/lib</value> - <description>The folder holding libcephfs and libhadoopceph</description> - </property> -</pre> - <li>There are also a number of optional Ceph configuration options. -<pre> -<property> - <name>fs.ceph.blockSize</name> - <value>67108864</value> - <description>Defaulting to 64MB, this is the size (in bytes) you want Ceph to use in striping data internally and presenting it to Hadoop.</description> -</property> - -<property> - <name>fs.ceph.debug</name> - <value>true</value> - <description>If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.</description> -</property> - -<property> - <name>fs.ceph.clientDebug</name> - <value>1</value> - <description>If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).</description> -</property> - -<property> - <name>fs.ceph.messengerDebug</name> - <value>1</value> - <description>If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)</description> -</property> - -<property> - <name>fs.ceph.readahead</name> - <value>1</value> - <description>Sets the number of object periods to read ahead in prefetching. This should probably be left at the default of 1.</description> -</property> - -<property> - <name>fs.ceph.commandLine</name> - <value>a string</value> - <description>If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br> -By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.</description> -</property> -</pre> - - <li>Start up your Ceph instance according to the Ceph documentation.</li> - <li>Do not use the bin/start-all.sh commands, as they will attempt to start - up an hdfs instance. Just start whatever systems you need and they will - automatically make use of the Ceph filesystem once configured as above.</li> -</body> -</html> diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h deleted file mode 100644 index 0c07fd56e37..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h +++ /dev/null @@ -1,13 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephFS */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephFS -#define _Included_org_apache_hadoop_fs_ceph_CephFS -#ifdef __cplusplus -extern "C" { -#endif -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h deleted file mode 100644 index 6f2bc93926c..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h +++ /dev/null @@ -1,31 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem -#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem -#ifdef __cplusplus -extern "C" { -#endif -#undef org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST -#define org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST 17L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT -#define org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT 2L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_FATAL -#define org_apache_hadoop_fs_ceph_CephFileSystem_FATAL 0L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_ERROR -#define org_apache_hadoop_fs_ceph_CephFileSystem_ERROR 1L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_WARN -#define org_apache_hadoop_fs_ceph_CephFileSystem_WARN 2L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_INFO -#define org_apache_hadoop_fs_ceph_CephFileSystem_INFO 3L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG -#define org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG 4L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_TRACE -#define org_apache_hadoop_fs_ceph_CephFileSystem_TRACE 5L -#undef org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG -#define org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG 6L -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h deleted file mode 100644 index 5ab70f7dc66..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h +++ /dev/null @@ -1,13 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_CephStat */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat -#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat -#ifdef __cplusplus -extern "C" { -#endif -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h deleted file mode 100644 index e9ade0e4504..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h +++ /dev/null @@ -1,13 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_Stat */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat -#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat -#ifdef __cplusplus -extern "C" { -#endif -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h deleted file mode 100644 index 4ec903294b7..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h +++ /dev/null @@ -1,47 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephInputStream */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephInputStream -#define _Included_org_apache_hadoop_fs_ceph_CephInputStream -#ifdef __cplusplus -extern "C" { -#endif -#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE -#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L -/* - * Class: org_apache_hadoop_fs_ceph_CephInputStream - * Method: ceph_read - * Signature: (I[BII)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read - (JNIEnv *, jobject, jint, jbyteArray, jint, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephInputStream - * Method: ceph_seek_from_start - * Signature: (IJ)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start - (JNIEnv *, jobject, jint, jlong); - -/* - * Class: org_apache_hadoop_fs_ceph_CephInputStream - * Method: ceph_getpos - * Signature: (I)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephInputStream - * Method: ceph_close - * Signature: (I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close - (JNIEnv *, jobject, jint); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h deleted file mode 100644 index 676b137c9f9..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h +++ /dev/null @@ -1,37 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephOutputStream */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephOutputStream -#define _Included_org_apache_hadoop_fs_ceph_CephOutputStream -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: org_apache_hadoop_fs_ceph_CephOutputStream - * Method: ceph_getpos - * Signature: (I)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephOutputStream - * Method: ceph_close - * Signature: (I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephOutputStream - * Method: ceph_write - * Signature: (I[BII)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write - (JNIEnv *, jobject, jint, jbyteArray, jint, jint); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h deleted file mode 100644 index 55854549b8c..00000000000 --- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h +++ /dev/null @@ -1,197 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class org_apache_hadoop_fs_ceph_CephTalker */ - -#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker -#define _Included_org_apache_hadoop_fs_ceph_CephTalker -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_initializeClient - * Signature: (Ljava/lang/String;I)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getcwd - * Signature: ()Ljava/lang/String; - */ -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd - (JNIEnv *, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setcwd - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rmdir - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_unlink - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rename - * Signature: (Ljava/lang/String;Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename - (JNIEnv *, jobject, jstring, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_exists - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getblocksize - * Signature: (Ljava/lang/String;)J - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isdirectory - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isfile - * Signature: (Ljava/lang/String;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getdir - * Signature: (Ljava/lang/String;)[Ljava/lang/String; - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_mkdirs - * Signature: (Ljava/lang/String;I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_append - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_read - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_overwrite - * Signature: (Ljava/lang/String;I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_close - * Signature: (I)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close - (JNIEnv *, jobject, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setPermission - * Signature: (Ljava/lang/String;I)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission - (JNIEnv *, jobject, jstring, jint); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_kill_client - * Signature: ()Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client - (JNIEnv *, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_stat - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat - (JNIEnv *, jobject, jstring, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_statfs - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs - (JNIEnv *, jobject, jstring, jobject); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_replication - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication - (JNIEnv *, jobject, jstring); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_hosts - * Signature: (IJ)Ljava/lang/String; - */ -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts - (JNIEnv *, jobject, jint, jlong); - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setTimes - * Signature: (Ljava/lang/String;JJ)I - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes - (JNIEnv *, jobject, jstring, jlong, jlong); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 2fa72d4ce0f..f6283239660 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -91,7 +91,6 @@ SUBSYS(finisher, 1, 1) SUBSYS(heartbeatmap, 1, 5) SUBSYS(perfcounter, 1, 5) SUBSYS(rgw, 1, 5) // log level for the Rados gateway -SUBSYS(hadoop, 1, 5) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) SUBSYS(throttle, 1, 1) diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 54390b1b2de..80ec69425ca 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -116,19 +116,6 @@ test_build_libcephfs_CFLAGS = $(AM_CFLAGS) test_build_libcephfs_CXXFLAGS = $(AM_CXXFLAGS) bin_DEBUGPROGRAMS += test_build_libcephfs -if WITH_HADOOPCLIENT -test_build_libhadoopcephfs_SOURCES = \ - test/buildtest_skeleton.cc \ - $(libhadoopcephfs_la_SOURCES) -test_build_libhadoopcephfs_LDADD = \ - $(LIBCEPHFS) -lexpat \ - $(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS) -test_build_libhadoopcephfs_LDFLAGS = -static-libtool-libs -test_build_libhadoopcephfs_CFLAGS = $(AM_CFLAGS) -test_build_libhadoopcephfs_CXXFLAGS = $(AM_CXXFLAGS) -bin_DEBUGPROGRAMS += test_build_libhadoopcephfs -endif # WITH_HADOOPCLIENT - endif # WITH_BUILD_TESTS |