summaryrefslogtreecommitdiff
path: root/src/client/hadoop/ceph/CephOutputStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/hadoop/ceph/CephOutputStream.java')
-rw-r--r--src/client/hadoop/ceph/CephOutputStream.java219
1 files changed, 0 insertions, 219 deletions
diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java
deleted file mode 100644
index 4c50f88467d..00000000000
--- a/src/client/hadoop/ceph/CephOutputStream.java
+++ /dev/null
@@ -1,219 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FS interfaces to allow applications to store
- * files in Ceph.
- */
-
-package org.apache.hadoop.fs.ceph;
-
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * <p>
- * An {@link OutputStream} for a CephFileSystem and corresponding
- * Ceph instance.
- */
-public class CephOutputStream extends OutputStream {
- private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
- private boolean closed;
-
- private CephFS ceph;
-
- private int fileHandle;
-
- private byte[] buffer;
- private int bufUsed = 0;
-
- /**
- * Construct the CephOutputStream.
- * @param conf The FileSystem configuration.
- * @param fh The Ceph filehandle to connect to.
- */
- public CephOutputStream(Configuration conf, CephFS cephfs,
- int fh, int bufferSize) {
- ceph = cephfs;
- fileHandle = fh;
- closed = false;
- buffer = new byte[bufferSize];
- }
-
- /** Ceph likes things to be closed before it shuts down,
- *so closing the IOStream stuff voluntarily is good
- */
- protected void finalize() throws Throwable {
- try {
- if (!closed) {
- close();
- }
- } finally {
- super.finalize();
- }
- }
-
- /**
- * Get the current position in the file.
- * @return The file offset in bytes.
- */
- public long getPos() throws IOException {
- return ceph.ceph_getpos(fileHandle);
- }
-
- /**
- * Write a byte.
- * @param b The byte to write.
- * @throws IOException If you have closed the CephOutputStream or the
- * write fails.
- */
- @Override
- public synchronized void write(int b) throws IOException {
- LOG.trace(
- "CephOutputStream.write: writing a single byte to fd " + fileHandle);
-
- if (closed) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
- + ": stream closed");
- }
- // Stick the byte in a buffer and write it
- byte buf[] = new byte[1];
-
- buf[0] = (byte) b;
- write(buf, 0, 1);
- return;
- }
-
- /**
- * Write a byte buffer into the Ceph file.
- * @param buf the byte array to write from
- * @param off the position in the file to start writing at.
- * @param len The number of bytes to actually write.
- * @throws IOException if you have closed the CephOutputStream, or
- * if buf is null or off + len > buf.length, or
- * if the write fails due to a Ceph error.
- */
- @Override
- public synchronized void write(byte buf[], int off, int len) throws IOException {
- LOG.trace(
- "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
- // make sure stream is open
- if (closed) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + len + "bytes to fd "
- + fileHandle + ": stream closed");
- }
-
- int result;
- int write;
-
- while (len > 0) {
- write = Math.min(len, buffer.length - bufUsed);
- try {
- System.arraycopy(buf, off, buffer, bufUsed, write);
- } catch (IndexOutOfBoundsException ie) {
- throw new IOException(
- "CephOutputStream.write: Indices out of bounds: "
- + "write length is " + len + ", buffer offset is " + off
- + ", and buffer size is " + buf.length);
- } catch (ArrayStoreException ae) {
- throw new IOException(
- "Uh-oh, CephOutputStream failed to do an array"
- + " copy due to type mismatch...");
- } catch (NullPointerException ne) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + len + "bytes to fd "
- + fileHandle + ": buffer is null");
- }
- bufUsed += write;
- len -= write;
- off += write;
- if (bufUsed == buffer.length) {
- result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
- if (result < 0) {
- throw new IOException(
- "CephOutputStream.write: Buffered write of " + bufUsed
- + " bytes failed!");
- }
- if (result != bufUsed) {
- throw new IOException(
- "CephOutputStream.write: Wrote only " + result + " bytes of "
- + bufUsed + " in buffer! Data may be lost or written"
- + " twice to Ceph!");
- }
- bufUsed = 0;
- }
-
- }
- return;
- }
-
- /**
- * Flush the buffered data.
- * @throws IOException if you've closed the stream or the write fails.
- */
- @Override
- public synchronized void flush() throws IOException {
- if (!closed) {
- if (bufUsed == 0) {
- return;
- }
- int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-
- if (result < 0) {
- throw new IOException(
- "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
- + fileHandle + " failed");
- }
- if (result != bufUsed) {
- throw new IOException(
- "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
- + fileHandle + "was incomplete: only " + result + " of " + bufUsed
- + " bytes were written.");
- }
- bufUsed = 0;
- return;
- }
- }
-
- /**
- * Close the CephOutputStream.
- * @throws IOException if Ceph somehow returns an error. In current code it can't.
- */
- @Override
- public synchronized void close() throws IOException {
- LOG.trace("CephOutputStream.close:enter");
- if (!closed) {
- flush();
- int result = ceph.ceph_close(fileHandle);
-
- if (result != 0) {
- throw new IOException("Close failed!");
- }
-
- closed = true;
- LOG.trace("CephOutputStream.close:exit");
- }
- }
-}