summaryrefslogtreecommitdiff
path: root/src/client/hadoop/ceph/CephFaker.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/hadoop/ceph/CephFaker.java')
-rw-r--r--src/client/hadoop/ceph/CephFaker.java483
1 files changed, 0 insertions, 483 deletions
diff --git a/src/client/hadoop/ceph/CephFaker.java b/src/client/hadoop/ceph/CephFaker.java
deleted file mode 100644
index c598f536039..00000000000
--- a/src/client/hadoop/ceph/CephFaker.java
+++ /dev/null
@@ -1,483 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * This uses the local Filesystem but pretends to be communicating
- * with a Ceph deployment, for unit testing the CephFileSystem.
- */
-
-package org.apache.hadoop.fs.ceph;
-
-
-import java.net.URI;
-import java.util.Hashtable;
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-
-class CephFaker extends CephFS {
- private static final Log LOG = LogFactory.getLog(CephFaker.class);
- FileSystem localFS;
- String localPrefix;
- int blockSize;
- Configuration conf;
- Hashtable<Integer, Object> files;
- Hashtable<Integer, String> filenames;
- int fileCount = 0;
- boolean initialized = false;
-
- public CephFaker(Configuration con, Log log) {
- conf = con;
- files = new Hashtable<Integer, Object>();
- filenames = new Hashtable<Integer, String>();
- }
-
- protected boolean ceph_initializeClient(String args, int block_size) {
- if (!initialized) {
- // let's remember the default block_size
- blockSize = block_size;
-
- /* for a real Ceph deployment, this starts up the client,
- * sets debugging levels, etc. We just need to get the
- * local FileSystem to use, and we'll ignore any
- * command-line arguments. */
- try {
- localFS = FileSystem.getLocal(conf);
- localFS.initialize(URI.create("file://localhost"), conf);
- localFS.setVerifyChecksum(false);
- String testDir = conf.get("hadoop.tmp.dir");
-
- localPrefix = localFS.getWorkingDirectory().toString();
- int testDirLoc = localPrefix.indexOf(testDir) - 1;
-
- if (-2 == testDirLoc) {
- testDirLoc = localPrefix.length();
- }
- localPrefix = localPrefix.substring(0, testDirLoc) + "/"
- + conf.get("hadoop.tmp.dir");
-
- localFS.setWorkingDirectory(
- new Path(localPrefix + "/user/" + System.getProperty("user.name")));
- // I don't know why, but the unit tests expect the default
- // working dir to be /user/username, so satisfy them!
- // debug("localPrefix is " + localPrefix, INFO);
- } catch (IOException e) {
- return false;
- }
- initialized = true;
- }
- return true;
- }
-
- protected String ceph_getcwd() {
- return sanitize_path(localFS.getWorkingDirectory().toString());
- }
-
- protected boolean ceph_setcwd(String path) {
- localFS.setWorkingDirectory(new Path(prepare_path(path)));
- return true;
- }
-
- // the caller is responsible for ensuring empty dirs
- protected boolean ceph_rmdir(String pth) {
- Path path = new Path(prepare_path(pth));
- boolean ret = false;
-
- try {
- if (localFS.listStatus(path).length <= 1) {
- ret = localFS.delete(path, true);
- }
- } catch (IOException e) {}
- return ret;
- }
-
- // this needs to work on (empty) directories too
- protected boolean ceph_unlink(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- if (ceph_isdirectory(path)) {
- ret = ceph_rmdir(path);
- } else {
- try {
- ret = localFS.delete(new Path(path), false);
- } catch (IOException e) {}
- }
- return ret;
- }
-
- protected boolean ceph_rename(String oldName, String newName) {
- oldName = prepare_path(oldName);
- newName = prepare_path(newName);
- try {
- Path parent = new Path(newName).getParent();
- Path newPath = new Path(newName);
-
- if (localFS.exists(parent) && !localFS.exists(newPath)) {
- return localFS.rename(new Path(oldName), newPath);
- }
- return false;
- } catch (IOException e) {
- return false;
- }
- }
-
- protected boolean ceph_exists(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- try {
- ret = localFS.exists(new Path(path));
- } catch (IOException e) {}
- return ret;
- }
-
- protected long ceph_getblocksize(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- return status.getBlockSize();
- } catch (FileNotFoundException e) {
- return -CephFS.ENOENT;
- } catch (IOException e) {
- return -1; // just fail generically
- }
- }
-
- protected boolean ceph_isdirectory(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- return status.isDir();
- } catch (IOException e) {
- return false;
- }
- }
-
- protected boolean ceph_isfile(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- ret = !status.isDir();
- } catch (Exception e) {}
- return ret;
- }
-
- protected String[] ceph_getdir(String path) {
- path = prepare_path(path);
- if (!ceph_isdirectory(path)) {
- return null;
- }
- try {
- FileStatus[] stats = localFS.listStatus(new Path(path));
- String[] names = new String[stats.length];
- String name;
-
- for (int i = 0; i < stats.length; ++i) {
- name = stats[i].getPath().toString();
- names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
- }
- return names;
- } catch (IOException e) {}
- return null;
- }
-
- protected int ceph_mkdirs(String path, int mode) {
- path = prepare_path(path);
- // debug("ceph_mkdirs on " + path, INFO);
- try {
- if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
- return 0;
- }
- } catch (IOException e) {}
- if (ceph_isdirectory(path)) { // apparently it already existed
- return -EEXIST;
- } else if (ceph_isfile(path)) {
- return -ENOTDIR;
- }
- return -1;
- }
-
- /*
- * Unlike a real Ceph deployment, you can't do opens on a directory.
- * Since that has unpredictable behavior and you shouldn't do it anyway,
- * it's okay.
- */
- protected int ceph_open_for_append(String path) {
- path = prepare_path(path);
- FSDataOutputStream stream;
-
- try {
- stream = localFS.append(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_open_for_read(String path) {
- path = prepare_path(path);
- FSDataInputStream stream;
-
- try {
- stream = localFS.open(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_open_for_overwrite(String path, int mode) {
- path = prepare_path(path);
- FSDataOutputStream stream;
-
- try {
- stream = localFS.create(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_close(int filehandle) {
- LOG.info("ceph_close(filehandle " + filehandle + ")");
- try {
- ((Closeable) files.get(new Integer(filehandle))).close();
- if (null == files.get(new Integer(filehandle))) {
- return -ENOENT; // this isn't quite the right error code,
- // but the important part is it's negative
- }
- return 0; // hurray, success
- } catch (NullPointerException ne) {
- LOG.warn("ceph_close caught NullPointerException!" + ne);
- } // err, how?
- catch (IOException ie) {
- LOG.warn("ceph_close caught IOException!" + ie);
- }
- return -1; // failure
- }
-
- protected boolean ceph_setPermission(String pth, int mode) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
-
- try {
- localFS.setPermission(path, new FsPermission((short) mode));
- ret = true;
- } catch (IOException e) {}
- return ret;
- }
-
- // rather than try and match a Ceph deployment's behavior exactly,
- // just make bad things happen if they try and call methods after this
- protected boolean ceph_kill_client() {
- // debug("ceph_kill_client", INFO);
- localFS.setWorkingDirectory(new Path(localPrefix));
- // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
- try {
- localFS.close();
- } catch (Exception e) {}
- localFS = null;
- files = null;
- filenames = null;
- return true;
- }
-
- protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
-
- try {
- FileStatus status = localFS.getFileStatus(path);
-
- fill.size = status.getLen();
- fill.is_dir = status.isDir();
- fill.block_size = status.getBlockSize();
- fill.mod_time = status.getModificationTime();
- fill.access_time = status.getAccessTime();
- fill.mode = status.getPermission().toShort();
- ret = true;
- } catch (IOException e) {}
- return ret;
- }
-
- protected int ceph_replication(String path) {
- path = prepare_path(path);
- int ret = -1; // -1 for failure
-
- try {
- ret = localFS.getFileStatus(new Path(path)).getReplication();
- } catch (IOException e) {}
- return ret;
- }
-
- protected String[] ceph_hosts(int fh, long offset) {
- String[] ret = null;
-
- try {
- BlockLocation[] locs = localFS.getFileBlockLocations(
- localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
- offset, 1);
-
- ret = locs[0].getNames();
- } catch (IOException e) {} catch (NullPointerException f) {}
- return ret;
- }
-
- protected int ceph_setTimes(String pth, long mtime, long atime) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- int ret = -1; // generic fail
-
- try {
- localFS.setTimes(path, mtime, atime);
- ret = 0;
- } catch (IOException e) {}
- return ret;
- }
-
- protected long ceph_getpos(int fh) {
- long ret = -1; // generic fail
-
- try {
- Object stream = files.get(new Integer(fh));
-
- if (stream instanceof FSDataInputStream) {
- ret = ((FSDataInputStream) stream).getPos();
- } else if (stream instanceof FSDataOutputStream) {
- ret = ((FSDataOutputStream) stream).getPos();
- }
- } catch (IOException e) {} catch (NullPointerException f) {}
- return ret;
- }
-
- protected int ceph_write(int fh, byte[] buffer,
- int buffer_offset, int length) {
- LOG.info(
- "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
- + length);
- long ret = -1; // generic fail
-
- try {
- FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-
- LOG.info("ceph_write got outputstream");
- long startPos = os.getPos();
-
- os.write(buffer, buffer_offset, length);
- ret = os.getPos() - startPos;
- } catch (IOException e) {
- LOG.warn("ceph_write caught IOException!");
- } catch (NullPointerException f) {
- LOG.warn("ceph_write caught NullPointerException!");
- }
- return (int) ret;
- }
-
- protected int ceph_read(int fh, byte[] buffer,
- int buffer_offset, int length) {
- long ret = -1; // generic fail
-
- try {
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
- long startPos = is.getPos();
-
- is.read(buffer, buffer_offset, length);
- ret = is.getPos() - startPos;
- } catch (IOException e) {} catch (NullPointerException f) {}
- return (int) ret;
- }
-
- protected long ceph_seek_from_start(int fh, long pos) {
- LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
- long ret = -1; // generic fail
-
- try {
- LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
- if (null == files.get(new Integer(fh))) {
- LOG.warn("ceph_seek_from_start: is is null!");
- }
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-
- LOG.info("ceph_seek_from_start retrieved is!");
- is.seek(pos);
- ret = is.getPos();
- } catch (IOException e) {
- LOG.warn("ceph_seek_from_start caught IOException!");
- } catch (NullPointerException f) {
- LOG.warn("ceph_seek_from_start caught NullPointerException!");
- }
- return (int) ret;
- }
-
- /*
- * We need to remove the localFS file prefix before returning to Ceph
- */
- private String sanitize_path(String path) {
- // debug("sanitize_path(" + path + ")", INFO);
- /* if (path.startsWith("file:"))
- path = path.substring("file:".length()); */
- if (path.startsWith(localPrefix)) {
- path = path.substring(localPrefix.length());
- if (path.length() == 0) { // it was a root path
- path = "/";
- }
- }
- // debug("sanitize_path returning " + path, INFO);
- return path;
- }
-
- /*
- * If it's an absolute path we need to shove the
- * test dir onto the front as a prefix.
- */
- private String prepare_path(String path) {
- // debug("prepare_path(" + path + ")", INFO);
- if (path.startsWith("/")) {
- path = localPrefix + path;
- } else if (path.equals("..")) {
- if (ceph_getcwd().equals("/")) {
- path = ".";
- } // you can't go up past root!
- }
- // debug("prepare_path returning" + path, INFO);
- return path;
- }
-}