diff options
Diffstat (limited to 'src/client/hadoop/CephFSInterface.cc')
-rw-r--r-- | src/client/hadoop/CephFSInterface.cc | 993 |
1 files changed, 0 insertions, 993 deletions
diff --git a/src/client/hadoop/CephFSInterface.cc b/src/client/hadoop/CephFSInterface.cc deleted file mode 100644 index d5a3c8f4fcd..00000000000 --- a/src/client/hadoop/CephFSInterface.cc +++ /dev/null @@ -1,993 +0,0 @@ -// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -#include "CephFSInterface.h" -#include "include/cephfs/libcephfs.h" -#include "common/ceph_argparse.h" -#include "common/config.h" -#include "msg/SimpleMessenger.h" - -#include <arpa/inet.h> -#include <sys/stat.h> -#include <sys/statvfs.h> - -#define dout_subsys ceph_subsys_hadoop - -union ceph_mount_union_t { - struct ceph_mount_info *cmount; - jlong cjlong; -}; - -static void set_ceph_mount_info(JNIEnv *env, jobject obj, struct ceph_mount_info *cmount) -{ - jclass cls = env->GetObjectClass(obj); - if (cls == NULL) - return; - jfieldID fid = env->GetFieldID(cls, "cluster", "J"); - if (fid == NULL) - return; - ceph_mount_union_t ceph_mount_union; - ceph_mount_union.cjlong = 0; - ceph_mount_union.cmount = cmount; - env->SetLongField(obj, fid, ceph_mount_union.cjlong); -} - -static struct ceph_mount_info *get_ceph_mount_t(JNIEnv *env, jobject obj) -{ - jclass cls = env->GetObjectClass(obj); - jfieldID fid = env->GetFieldID(cls, "cluster", "J"); - if (fid == NULL) - return NULL; - ceph_mount_union_t ceph_mount_union; - ceph_mount_union.cjlong = env->GetLongField(obj, fid); - return ceph_mount_union.cmount; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_initializeClient - * Signature: (Ljava/lang/String;I)Z - * - * Performs any necessary setup to allow general use of the filesystem. - * Inputs: - * jstring args -- a command-line style input of Ceph config params - * jint block_size -- the size in bytes to use for blocks - * Returns: true on success, false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient - (JNIEnv *env, jobject obj, jstring j_args, jint block_size) -{ - // Convert Java argument string to argv - const char *c_args = env->GetStringUTFChars(j_args, 0); - if (c_args == NULL) - return false; //out of memory! - string cppargs(c_args); - char b[cppargs.length()+1]; - strcpy(b, cppargs.c_str()); - env->ReleaseStringUTFChars(j_args, c_args); - std::vector<const char*> args; - char *p = b; - while (*p) { - args.push_back(p); - while (*p && *p != ' ') - p++; - if (!*p) - break; - *p++ = 0; - while (*p && *p == ' ') - p++; - } - - // parse the arguments - bool set_local_writes = false; - std::string mount_root, val; - for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) { - if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) { - mount_root = val; - } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) { - set_local_writes = true; - } else { - ++i; - } - } - - // connect to the cmount - struct ceph_mount_info *cmount; - int ret = ceph_create(&cmount, NULL); - if (ret) - return false; - ceph_conf_read_file(cmount, NULL); // read config file from the default location - ceph_conf_parse_argv(cmount, args.size(), &args[0]); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 3) << "CephFSInterface: mounting filesystem...:" << dendl; - - ret = ceph_mount(cmount, mount_root.c_str()); - if (ret) - return false; - - ceph_localize_reads(cmount, true); - ceph_set_default_file_stripe_unit(cmount, block_size); - ceph_set_default_object_size(cmount, block_size); - - if (set_local_writes) { - ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount)); - } - - set_ceph_mount_info(env, obj, cmount); - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getcwd - * Signature: (J)Ljava/lang/String; - * - * Returns the current working directory.(absolute) as a jstring - */ -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd - (JNIEnv *env, jobject obj) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In getcwd" << dendl; - jstring j_path = env->NewStringUTF(ceph_getcwd(cmount)); - return j_path; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setcwd - * Signature: (Ljava/lang/String;)Z - * - * Changes the working directory. - * Inputs: - * jstring j_path: The path (relative or absolute) to switch to - * Returns: true on success, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In setcwd" << dendl; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int ret = ceph_chdir(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rmdir - * Signature: (Ljava/lang/String;)Z - * - * Given a path to a directory, removes the directory.if empty. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory - * Returns: true on successful delete; false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In rmdir" << dendl; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL) - return false; - int ret = ceph_rmdir(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_unlink - * Signature: (Ljava/lang/String;)Z - * Given a path, unlinks it. - * Inputs: - * jstring j_path: The path (relative or absolute) to the file or empty dir - * Returns: true if the unlink occurred, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - ldout(cct, 10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl; - int ret = ceph_unlink(cmount, c_path); - env->ReleaseStringUTFChars(j_path, c_path); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_rename - * Signature: (Ljava/lang/String;Ljava/lang/String;)Z - * Changes a given path name to a new name. - * Inputs: - * jstring j_from: The path whose name you want to change. - * jstring j_to: The new name for the path. - * Returns: true if the rename occurred, false otherwise - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename - (JNIEnv *env, jobject obj, jstring j_from, jstring j_to) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In rename" << dendl; - const char *c_from = env->GetStringUTFChars(j_from, 0); - if (c_from == NULL) - return false; - const char *c_to = env->GetStringUTFChars(j_to, 0); - if (c_to == NULL) { - env->ReleaseStringUTFChars(j_from, c_from); - return false; - } - struct stat stbuf; - int ret = ceph_lstat(cmount, c_to, &stbuf); - if (ret != -ENOENT) { - // Hadoop doesn't want to overwrite files in a rename. - env->ReleaseStringUTFChars(j_from, c_from); - env->ReleaseStringUTFChars(j_to, c_to); - return JNI_FALSE; - } - - ret = ceph_rename(cmount, c_from, c_to); - return ret ? JNI_FALSE : JNI_TRUE; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_exists - * Signature: (Ljava/lang/String;)Z - * Returns true if it the input path exists, false - * if it does not or there is an unexpected failure. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "CephFSInterface: In exists" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - ldout(cct, 10) << "Attempting lstat with file " << c_path << ":" << dendl; - - int ret = ceph_lstat(cmount, c_path, &stbuf); - ldout(cct, 10) << "result is " << ret << dendl; - env->ReleaseStringUTFChars(j_path, c_path); - if (ret < 0) { - ldout(cct, 10) << "Returning false (file does not exist)" << dendl; - return JNI_FALSE; - } - else { - ldout(cct, 10) << "Returning true (file exists)" << dendl; - return JNI_TRUE; - } -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getblocksize - * Signature: (Ljava/lang/String;)J - * Get the block size for a given path. - * Input: - * j_string j_path: The path (relative or absolute) you want - * the block size for. - * Returns: block size (as a long) if the path exists, otherwise a negative - * number corresponding to the standard C++ error codes (which are positive). - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In getblocksize" << dendl; - - //struct stat stbuf; - - jlong result; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - // we need to open the file to retrieve the stripe size - ldout(cct, 10) << "CephFSInterface: getblocksize: opening file" << dendl; - int fh = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - if (fh < 0) - return fh; - - result = ceph_get_file_stripe_unit(cmount, fh); - - int close_result = ceph_close(cmount, fh); - if (close_result < 0) - return close_result; - - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isfile - * Signature: (Ljava/lang/String;)Z - * Returns true if the given path is a file; false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In isfile" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int ret = ceph_lstat(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - - // if the stat call failed, it's definitely not a file... - if (ret < 0) - return false; - - // check the stat result - return !!S_ISREG(stbuf.st_mode); -} - - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_isdirectory - * Signature: (Ljava/lang/String;)Z - * Returns true if the given path is a directory, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In isdirectory" << dendl; - - struct stat stbuf; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - int result = ceph_lstat(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - - // if the stat call failed, it's definitely not a directory... - if (result < 0) - return JNI_FALSE; - - // check the stat result - return !!S_ISDIR(stbuf.st_mode); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getdir - * Signature: (Ljava/lang/String;)[Ljava/lang/String; - * Get the contents of a given directory. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory. - * Returns: A Java String[] of the contents of the directory, or - * NULL if there is an error (ie, path is not a dir). This listing - * will not contain . or .. entries. - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In getdir" << dendl; - - // get the directory listing - list<string> contents; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return NULL; - struct ceph_dir_result *dirp; - int r; - r = ceph_opendir(cmount, c_path, &dirp); - if (r<0) { - env->ReleaseStringUTFChars(j_path, c_path); - return NULL; - } - int buflen = 100; //good default? - char *buf = new char[buflen]; - string *ent; - int bufpos; - while (1) { - r = ceph_getdnames(cmount, dirp, buf, buflen); - if (r==-ERANGE) { //expand the buffer - delete [] buf; - buflen *= 2; - buf = new char[buflen]; - continue; - } - if (r<=0) break; - - //if we make it here, we got at least one name - bufpos = 0; - while (bufpos<r) {//make new strings and add them to listing - ent = new string(buf+bufpos); - if (ent->compare(".") && ent->compare("..")) - //we DON'T want to include dot listings; Hadoop gets confused - contents.push_back(*ent); - bufpos+=ent->size()+1; - delete ent; - } - } - delete [] buf; - ceph_closedir(cmount, dirp); - env->ReleaseStringUTFChars(j_path, c_path); - - if (r < 0) return NULL; - - // Create a Java String array of the size of the directory listing - jclass stringClass = env->FindClass("java/lang/String"); - if (stringClass == NULL) { - ldout(cct, 0) << "ERROR: java String class not found; dying a horrible, painful death" << dendl; - assert(0); - } - jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(contents.size(), stringClass, NULL); - if(dirListingStringArray == NULL) return NULL; - - // populate the array with the elements of the directory list - int i = 0; - for (list<string>::iterator it = contents.begin(); - it != contents.end(); - ++it) { - env->SetObjectArrayElement(dirListingStringArray, i, - env->NewStringUTF(it->c_str())); - ++i; - } - - return dirListingStringArray; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_mkdirs - * Signature: (Ljava/lang/String;I)I - * Create the specified directory and any required intermediate ones with the - * given mode. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs -(JNIEnv *env, jobject obj, jstring j_path, jint mode) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In Hadoop mk_dirs" << dendl; - - //get c-style string and make the call, clean up the string... - jint result; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_mkdirs(cmount, c_path, mode); - env->ReleaseStringUTFChars(j_path, c_path); - - //...and return - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_append - * Signature: (Ljava/lang/String;)I - * Open a file to append. If the file does not exist, it will be created. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append -(JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In hadoop open_for_append" << dendl; - - jint result; - - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0); - env->ReleaseStringUTFChars(j_path, c_path); - - return result; -} - - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_read - * Signature: (Ljava/lang/String;)I - * Open a file for reading. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read - (JNIEnv *env, jobject obj, jstring j_path) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In open_for_read" << dendl; - - jint result; - - // open as read-only: flag = O_RDONLY - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - - // returns file handle, or -1 on failure - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_open_for_overwrite - * Signature: (Ljava/lang/String;)I - * Opens a file for overwriting; creates it if necessary. - * Opening a dir is possible but may have bad results. - * Inputs: - * jstring j_path: The path to open. - * jint mode: The mode to open with. - * Returns: a jint filehandle, or a number<0 if an error occurs. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite - (JNIEnv *env, jobject obj, jstring j_path, jint mode) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In open_for_overwrite" << dendl; - - jint result; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode); - env->ReleaseStringUTFChars(j_path, c_path); - - // returns file handle, or -1 on failure - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_close - * Signature: (I)I - * Closes a given filehandle. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close -(JNIEnv *env, jobject obj, jint fh) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::ceph_close" << dendl; - return ceph_close(cmount, fh); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setPermission - * Signature: (Ljava/lang/String;I)Z - * Change the mode on a path. - * Inputs: - * jstring j_path: The path to change mode on. - * jint j_new_mode: The mode to apply. - * Returns: true if the mode is properly applied, false if there - * is any error. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission -(JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode) -{ - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return false; - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int result = ceph_chmod(cmount, c_path, j_new_mode); - env->ReleaseStringUTFChars(j_path, c_path); - - return (result==0); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_kill_client - * Signature: (J)Z - * - * Closes the Ceph client. This should be called before shutting down - * (multiple times is okay but redundant). - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client - (JNIEnv *env, jobject obj) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - if (!cmount) - return true; - ceph_shutdown(cmount); - set_ceph_mount_info(env, obj, NULL); - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_stat - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z - * Get the statistics on a path returned in a custom format defined - * in CephTalker. - * Inputs: - * jstring j_path: The path to stat. - * jobject j_stat: The stat object to fill. - * Returns: true if the stat is successful, false otherwise. - */ -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat -(JNIEnv *env, jobject obj, jstring j_path, jobject j_stat) -{ - //setup variables - struct stat st; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; - - jclass cls = env->GetObjectClass(j_stat); - if (cls == NULL) return false; - jfieldID c_size_id = env->GetFieldID(cls, "size", "J"); - if (c_size_id == NULL) return false; - jfieldID c_dir_id = env->GetFieldID(cls, "is_dir", "Z"); - if (c_dir_id == NULL) return false; - jfieldID c_block_id = env->GetFieldID(cls, "block_size", "J"); - if (c_block_id == NULL) return false; - jfieldID c_mod_id = env->GetFieldID(cls, "mod_time", "J"); - if (c_mod_id == NULL) return false; - jfieldID c_access_id = env->GetFieldID(cls, "access_time", "J"); - if (c_access_id == NULL) return false; - jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I"); - if (c_mode_id == NULL) return false; - //do actual lstat - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int r = ceph_lstat(cmount, c_path, &st); - env->ReleaseStringUTFChars(j_path, c_path); - - if (r < 0) return false; //fail out; file DNE or Ceph broke - - //put variables from struct stat into Java - env->SetLongField(j_stat, c_size_id, st.st_size); - env->SetBooleanField(j_stat, c_dir_id, (0 != S_ISDIR(st.st_mode))); - env->SetLongField(j_stat, c_block_id, st.st_blksize); - - long long java_mtime(st.st_mtim.tv_sec); - java_mtime *= 1000; - java_mtime += st.st_mtim.tv_nsec / 1000; - env->SetLongField(j_stat, c_mod_id, java_mtime); - - long long java_atime(st.st_atim.tv_sec); - java_atime *= 1000; - java_atime += st.st_atim.tv_nsec / 1000; - env->SetLongField(j_stat, c_access_id, java_atime); - - env->SetIntField(j_stat, c_mode_id, (int)st.st_mode); - - //return happy - return true; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_statfs - * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I - * Statfs a filesystem in a custom format defined in CephTalker. - * Inputs: - * jstring j_path: A path on the filesystem that you wish to stat. - * jobject j_ceph_stat: The CephStat object to fill. - * Returns: true if successful and the CephStat is filled; false otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs -(JNIEnv *env, jobject obj, jstring j_path, jobject j_cephstat) -{ - //setup variables - struct statvfs stbuf; - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - jclass cls = env->GetObjectClass(j_cephstat); - if (cls == NULL) - return 1; //JVM error of some kind - jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J"); - jfieldID c_used_id = env->GetFieldID(cls, "used", "J"); - jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J"); - - //do the statfs - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int r = ceph_statfs(cmount, c_path, &stbuf); - env->ReleaseStringUTFChars(j_path, c_path); - if (r != 0) - return r; //something broke - - //place info into Java; convert from bytes to kilobytes - env->SetLongField(j_cephstat, c_capacity_id, - (long)stbuf.f_blocks*stbuf.f_bsize/1024); - env->SetLongField(j_cephstat, c_used_id, - (long)(stbuf.f_blocks-stbuf.f_bavail)*stbuf.f_bsize/1024); - env->SetLongField(j_cephstat, c_remaining_id, - (long)stbuf.f_bavail*stbuf.f_bsize/1024); - return r; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_replication - * Signature: (Ljava/lang/String;)I - * Check how many times a path should be replicated (if it is - * degraded it may not actually be replicated this often). - * Inputs: - * jstring j_path: The path to check. - * Returns: an int containing the number of times replicated. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication -(JNIEnv *env, jobject obj, jstring j_path) -{ - //get c-string of path, send off to libceph, release c-string, return - const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) - return -ENOMEM; - ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - int fh = 0; - fh = ceph_open(cmount, c_path, O_RDONLY, 0); - env->ReleaseStringUTFChars(j_path, c_path); - if (fh < 0) { - return fh; - } - int replication = ceph_get_file_replication(cmount, fh); - ceph_close(cmount, fh); - return replication; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_hosts - * Signature: (IJ)[Ljava/lang/String; - * Find the IP:port addresses of the primary OSD for a given file and offset. - * Inputs: - * jint j_fh: The filehandle for the file. - * jlong j_offset: The offset to get the location of. - * Returns: a jstring of the location as IP, or NULL if there is an error. - */ -JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts -(JNIEnv *env, jobject obj, jint j_fh, jlong j_offset) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - struct sockaddr_storage *ss; - char address[30]; - jobjectArray addr_array; - jclass string_cls; - jstring j_addr; - int r, n = 3; /* initial guess at # of replicas */ - - for (;;) { - ss = new struct sockaddr_storage[n]; - r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, ss, n); - if (r < 0) { - if (r == -ERANGE) { - delete [] ss; - n *= 2; - continue; - } - return NULL; - } - n = r; - break; - } - - /* TODO: cache this */ - string_cls = env->FindClass("java/lang/String"); - if (!string_cls) - goto out; - - addr_array = env->NewObjectArray(n, string_cls, NULL); - if (!addr_array) - goto out; - - for (r = 0; r < n; r++) { - /* Hadoop only deals with IPv4 */ - if (ss[r].ss_family != AF_INET) - goto out; - - memset(address, 0, sizeof(address)); - - inet_ntop(ss[r].ss_family, &((struct sockaddr_in *)&ss[r])->sin_addr, - address, sizeof(address)); - - j_addr = env->NewStringUTF(address); - - env->SetObjectArrayElement(addr_array, r, j_addr); - if (env->ExceptionOccurred()) - goto out; - - env->DeleteLocalRef(j_addr); - } - - delete [] ss; - return addr_array; - -out: - delete [] ss; - return NULL; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_setTimes - * Signature: (Ljava/lang/String;JJ)I - * Set the mtime and atime for a given path. - * Inputs: - * jstring j_path: The path to set the times for. - * jlong mtime: The mtime to set, in millis since epoch (-1 to not set). - * jlong atime: The atime to set, in millis since epoch (-1 to not set) - * Returns: 0 if successful, an error code otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes -(JNIEnv *env, jobject obj, jstring j_path, jlong mtime, jlong atime) -{ - const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL) return -ENOMEM; - - //build the mask for ceph_setattr - int mask = 0; - if (mtime!=-1) mask = CEPH_SETATTR_MTIME; - if (atime!=-1) mask |= CEPH_SETATTR_ATIME; - //build a struct stat and fill it in! - //remember to convert from millis to seconds and microseconds - struct stat attr; - attr.st_mtim.tv_sec = mtime / 1000; - attr.st_mtim.tv_nsec = (mtime % 1000) * 1000000; - attr.st_atim.tv_sec = atime / 1000; - attr.st_atim.tv_nsec = (atime % 1000) * 1000000; - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - return ceph_setattr(cmount, c_path, &attr, mask); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_read - * Signature: (JI[BII)I - * Reads into the given byte array from the current position. - * Inputs: - * jint fh: the filehandle to read from - * jbyteArray j_buffer: the byte array to read into - * jint buffer_offset: where in the buffer to start writing - * jint length: how much to read. - * There'd better be enough space in the buffer to write all - * the data from the given offset! - * Returns: the number of bytes read on success (as jint), - * or an error code otherwise. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read - (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In read" << dendl; - - - // Make sure to convert the Hadoop read arguments into a - // more ceph-friendly form - jint result; - - // Step 1: get a pointer to the buffer. - jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); - if (j_buffer_ptr == NULL) return -ENOMEM; - char *c_buffer = (char*) j_buffer_ptr; - - // Step 2: pointer arithmetic to start in the right buffer position - c_buffer += (int)buffer_offset; - - // Step 3: do the read - result = ceph_read(cmount, (int)fh, c_buffer, length, -1); - - // Step 4: release the pointer to the buffer - env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); - - return result; -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_seek_from_start - * Signature: (JIJ)J - * Seeks to the given position in the given file. - * Inputs: - * jint fh: The filehandle to seek in. - * jlong pos: The position to seek to. - * Returns: the new position (as a jlong) of the filehandle on success, - * or a negative error code on failure. - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start - (JNIEnv *env, jobject obj, jint fh, jlong pos) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::seek_from_start" << dendl; - return ceph_lseek(cmount, fh, pos, SEEK_SET); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_getpos - * Signature: (I)J - * - * Get the current position in a file (as a jlong) of a given filehandle. - * Returns: jlong current file position on success, or a - * negative error code on failure. - */ -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos - (JNIEnv *env, jobject obj, jint fh) -{ - // seek a distance of 0 to get current offset - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In CephTalker::ceph_getpos" << dendl; - return ceph_lseek(cmount, fh, 0, SEEK_CUR); -} - -/* - * Class: org_apache_hadoop_fs_ceph_CephTalker - * Method: ceph_write - * Signature: (I[BII)I - * Write the given buffer contents to the given filehandle. - * Inputs: - * jint fh: The filehandle to write to. - * jbyteArray j_buffer: The buffer to write from - * jint buffer_offset: The position in the buffer to write from - * jint length: The number of (sequential) bytes to write. - * Returns: jint, on success the number of bytes written, on failure - * a negative error code. - */ -JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write - (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) -{ - struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj); - CephContext *cct = ceph_get_mount_context(cmount); - ldout(cct, 10) << "In write" << dendl; - - // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we - // have to convert. The write is *always* from the current position in the file, - // and buffer_offset is the location in the *buffer* where we start writing. - jint result; - - // Step 1: get a pointer to the buffer. - jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); - if (j_buffer_ptr == NULL) - return -ENOMEM; - char *c_buffer = (char*) j_buffer_ptr; - - // Step 2: pointer arithmetic to start in the right buffer position - c_buffer += (int)buffer_offset; - - // Step 3: do the write - result = ceph_write(cmount, (int)fh, c_buffer, length, -1); - - // Step 4: release the pointer to the buffer - env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); - - return result; -} |