diff options
Diffstat (limited to 'src/client/hadoop/ceph/CephInputStream.java')
-rw-r--r-- | src/client/hadoop/ceph/CephInputStream.java | 254 |
1 files changed, 0 insertions, 254 deletions
diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java deleted file mode 100644 index d9668d031ba..00000000000 --- a/src/client/hadoop/ceph/CephInputStream.java +++ /dev/null @@ -1,254 +0,0 @@ -// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- - -/** - * - * Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - * - * - * Implements the Hadoop FS interfaces to allow applications to store - * files in Ceph. - */ -package org.apache.hadoop.fs.ceph; - - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSInputStream; - - -/** - * <p> - * An {@link FSInputStream} for a CephFileSystem and corresponding - * Ceph instance. - */ -public class CephInputStream extends FSInputStream { - private static final Log LOG = LogFactory.getLog(CephInputStream.class); - private boolean closed; - - private int fileHandle; - - private long fileLength; - - private CephFS ceph; - - private byte[] buffer; - private int bufPos = 0; - private int bufValid = 0; - private long cephPos = 0; - - /** - * Create a new CephInputStream. - * @param conf The system configuration. Unused. - * @param fh The filehandle provided by Ceph to reference. - * @param flength The current length of the file. If the length changes - * you will need to close and re-open it to access the new data. - */ - public CephInputStream(Configuration conf, CephFS cephfs, - int fh, long flength, int bufferSize) { - // Whoever's calling the constructor is responsible for doing the actual ceph_open - // call and providing the file handle. - fileLength = flength; - fileHandle = fh; - closed = false; - ceph = cephfs; - buffer = new byte[bufferSize]; - LOG.debug( - "CephInputStream constructor: initializing stream with fh " + fh - + " and file length " + flength); - - } - - /** Ceph likes things to be closed before it shuts down, - * so closing the IOStream stuff voluntarily in a finalizer is good - */ - protected void finalize() throws Throwable { - try { - if (!closed) { - close(); - } - } finally { - super.finalize(); - } - } - - private synchronized boolean fillBuffer() throws IOException { - bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length); - bufPos = 0; - if (bufValid < 0) { - int err = bufValid; - - bufValid = 0; - // attempt to reset to old position. If it fails, too bad. - ceph.ceph_seek_from_start(fileHandle, cephPos); - throw new IOException("Failed to fill read buffer! Error code:" + err); - } - cephPos += bufValid; - return (bufValid != 0); - } - - /* - * Get the current position of the stream. - */ - public synchronized long getPos() throws IOException { - return cephPos - bufValid + bufPos; - } - - /** - * Find the number of bytes remaining in the file. - */ - @Override - public synchronized int available() throws IOException { - return (int) (fileLength - getPos()); - } - - public synchronized void seek(long targetPos) throws IOException { - LOG.trace( - "CephInputStream.seek: Seeking to position " + targetPos + " on fd " - + fileHandle); - if (targetPos > fileLength) { - throw new IOException( - "CephInputStream.seek: failed seek to position " + targetPos - + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); - } - long oldPos = cephPos; - - cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); - bufValid = 0; - bufPos = 0; - if (cephPos < 0) { - cephPos = oldPos; - throw new IOException("Ceph failed to seek to new position!"); - } - } - - /** - * Failovers are handled by the Ceph code at a very low level; - * if there are issues that can be solved by changing sources - * they'll be dealt with before anybody even tries to call this method! - * @return false. - */ - public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** - * Read a byte from the file. - * @return the next byte. - */ - @Override - public synchronized int read() throws IOException { - LOG.trace( - "CephInputStream.read: Reading a single byte from fd " + fileHandle - + " by calling general read function"); - - byte result[] = new byte[1]; - - if (getPos() >= fileLength) { - return -1; - } - if (-1 == read(result, 0, 1)) { - return -1; - } - if (result[0] < 0) { - return 256 + (int) result[0]; - } else { - return result[0]; - } - } - - /** - * Read a specified number of bytes from the file into a byte[]. - * @param buf the byte array to read into. - * @param off the offset to start at in the file - * @param len the number of bytes to read - * @return 0 if successful, otherwise an error code. - * @throws IOException on bad input. - */ - @Override - public synchronized int read(byte buf[], int off, int len) - throws IOException { - LOG.trace( - "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); - - if (closed) { - throw new IOException( - "CephInputStream.read: cannot read " + len + " bytes from fd " - + fileHandle + ": stream closed"); - } - - // ensure we're not past the end of the file - if (getPos() >= fileLength) { - LOG.debug( - "CephInputStream.read: cannot read " + len + " bytes from fd " - + fileHandle + ": current position is " + getPos() - + " and file length is " + fileLength); - - return -1; - } - - int totalRead = 0; - int initialLen = len; - int read; - - do { - read = Math.min(len, bufValid - bufPos); - try { - System.arraycopy(buffer, bufPos, buf, off, read); - } catch (IndexOutOfBoundsException ie) { - throw new IOException( - "CephInputStream.read: Indices out of bounds:" + "read length is " - + len + ", buffer offset is " + off + ", and buffer size is " - + buf.length); - } catch (ArrayStoreException ae) { - throw new IOException( - "Uh-oh, CephInputStream failed to do an array" - + "copy due to type mismatch..."); - } catch (NullPointerException ne) { - throw new IOException( - "CephInputStream.read: cannot read " + len + "bytes from fd:" - + fileHandle + ": buf is null"); - } - bufPos += read; - len -= read; - off += read; - totalRead += read; - } while (len > 0 && fillBuffer()); - - LOG.trace( - "CephInputStream.read: Reading " + initialLen + " bytes from fd " - + fileHandle + ": succeeded in reading " + totalRead + " bytes"); - return totalRead; - } - - /** - * Close the CephInputStream and release the associated filehandle. - */ - @Override - public void close() throws IOException { - LOG.trace("CephOutputStream.close:enter"); - if (!closed) { - int result = ceph.ceph_close(fileHandle); - - closed = true; - if (result != 0) { - throw new IOException( - "Close somehow failed!" - + "Don't try and use this stream again, though"); - } - LOG.trace("CephOutputStream.close:exit"); - } - } -} |