diff options
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java')
-rw-r--r-- | lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java | 339 |
1 files changed, 155 insertions, 184 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java index 61b68d279..713d52103 100644 --- a/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java +++ b/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java @@ -21,22 +21,21 @@ package org.apache.thrift.transport; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.IOException; import java.util.Random; - import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * FileTransport implementation of the TTransport interface. - * Currently this is a straightforward port of the cpp implementation + * FileTransport implementation of the TTransport interface. Currently this is a straightforward + * port of the cpp implementation * - * It may make better sense to provide a basic stream access on top of the framed file format - * The FileTransport can then be a user of this framed file format with some additional logic - * for chunking. + * <p>It may make better sense to provide a basic stream access on top of the framed file format The + * FileTransport can then be a user of this framed file format with some additional logic for + * chunking. */ public class TFileTransport extends TTransport { @@ -46,15 +45,16 @@ public class TFileTransport extends TTransport { public void trunc() { pos = count = 0; } + public TruncableBufferedInputStream(InputStream in) { super(in); } + public TruncableBufferedInputStream(InputStream in, int size) { super(in, size); } } - public static class Event { private byte[] buf_; private int nread_; @@ -70,64 +70,81 @@ public class TFileTransport extends TTransport { nread_ = navailable_ = 0; } - public byte[] getBuf() { return buf_;} - public int getSize() { return buf_.length; } + public byte[] getBuf() { + return buf_; + } + public int getSize() { + return buf_.length; + } - public void setAvailable(int sz) { nread_ = 0; navailable_=sz;} - public int getRemaining() { return (navailable_ - nread_); } + public void setAvailable(int sz) { + nread_ = 0; + navailable_ = sz; + } + + public int getRemaining() { + return (navailable_ - nread_); + } public int emit(byte[] buf, int offset, int ndesired) { - if((ndesired == 0) || (ndesired > getRemaining())) - ndesired = getRemaining(); + if ((ndesired == 0) || (ndesired > getRemaining())) ndesired = getRemaining(); - if(ndesired <= 0) - return (ndesired); + if (ndesired <= 0) return (ndesired); System.arraycopy(buf_, nread_, buf, offset, ndesired); nread_ += ndesired; - return(ndesired); + return (ndesired); } } - public static class ChunkState { - /** - * Chunk Size. Must be same across all implementations - */ + public static class ChunkState { + /** Chunk Size. Must be same across all implementations */ public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; private int chunk_size_ = DEFAULT_CHUNK_SIZE; private long offset_ = 0; public ChunkState() {} - public ChunkState(int chunk_size) { chunk_size_ = chunk_size; } - public void skip(int size) {offset_ += size; } - public void seek(long offset) {offset_ = offset;} + public ChunkState(int chunk_size) { + chunk_size_ = chunk_size; + } - public int getChunkSize() { return chunk_size_;} - public int getChunkNum() { return ((int)(offset_/chunk_size_));} - public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));} - public long getOffset() { return (offset_);} + public void skip(int size) { + offset_ += size; + } + + public void seek(long offset) { + offset_ = offset; + } + + public int getChunkSize() { + return chunk_size_; + } + + public int getChunkNum() { + return ((int) (offset_ / chunk_size_)); + } + + public int getRemaining() { + return (chunk_size_ - ((int) (offset_ % chunk_size_))); + } + + public long getOffset() { + return (offset_); + } } public enum TailPolicy { - NOWAIT(0, 0), - WAIT_FOREVER(500, -1); + WAIT_FOREVER(500, -1); - /** - * Time in milliseconds to sleep before next read - * If 0, no sleep - */ + /** Time in milliseconds to sleep before next read If 0, no sleep */ public final int timeout_; - /** - * Number of retries before giving up - * if 0, no retries - * if -1, retry forever - */ + /** Number of retries before giving up if 0, no retries if -1, retry forever */ public final int retries_; /** @@ -136,48 +153,31 @@ public class TFileTransport extends TTransport { * @param timeout sleep time for this particular policy * @param retries number of retries */ - TailPolicy(int timeout, int retries) { timeout_ = timeout; retries_ = retries; } } - /** - * Current tailing policy - */ + /** Current tailing policy */ TailPolicy currentPolicy_ = TailPolicy.NOWAIT; - - /** - * Underlying file being read - */ + /** Underlying file being read */ protected TSeekableFile inputFile_ = null; - /** - * Underlying outputStream - */ + /** Underlying outputStream */ protected OutputStream outputStream_ = null; - - /** - * Event currently read in - */ + /** Event currently read in */ Event currentEvent_ = null; - /** - * InputStream currently being used for reading - */ + /** InputStream currently being used for reading */ InputStream inputStream_ = null; - /** - * current Chunk state - */ + /** current Chunk state */ ChunkState cs = null; - /** - * is read only? - */ + /** is read only? */ private boolean readOnly_ = false; /** @@ -201,7 +201,6 @@ public class TFileTransport extends TTransport { return (old); } - /** * Initialize read input stream * @@ -210,8 +209,8 @@ public class TFileTransport extends TTransport { private InputStream createInputStream() throws TTransportException { InputStream is; try { - if(inputStream_ != null) { - ((TruncableBufferedInputStream)inputStream_).trunc(); + if (inputStream_ != null) { + ((TruncableBufferedInputStream) inputStream_).trunc(); is = inputStream_; } else { is = new TruncableBufferedInputStream(inputFile_.getInputStream()); @@ -219,7 +218,7 @@ public class TFileTransport extends TTransport { } catch (IOException iox) { throw new TTransportException(iox.getMessage(), iox); } - return(is); + return (is); } /** @@ -229,18 +228,17 @@ public class TFileTransport extends TTransport { * @param buf Buffer to read into * @param off Offset in buffer to read into * @param len Number of bytes to read - * @param tp policy to use if we hit EOF - * + * @param tp policy to use if we hit EOF * @return number of bytes read */ - private int tailRead(InputStream is, byte[] buf, - int off, int len, TailPolicy tp) throws TTransportException { + private int tailRead(InputStream is, byte[] buf, int off, int len, TailPolicy tp) + throws TTransportException { int orig_len = len; try { int retries = 0; - while(len > 0) { + while (len > 0) { int cnt = is.read(buf, off, len); - if(cnt > 0) { + if (cnt > 0) { off += cnt; len -= cnt; retries = 0; @@ -249,40 +247,40 @@ public class TFileTransport extends TTransport { // EOF retries++; - if((tp.retries_ != -1) && tp.retries_ < retries) - return (orig_len - len); + if ((tp.retries_ != -1) && tp.retries_ < retries) return (orig_len - len); - if(tp.timeout_ > 0) { - try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {} + if (tp.timeout_ > 0) { + try { + Thread.sleep(tp.timeout_); + } catch (InterruptedException e) { + } } } else { // either non-zero or -1 is what the contract says! - throw new - TTransportException("Unexpected return from InputStream.read = " - + cnt); + throw new TTransportException("Unexpected return from InputStream.read = " + cnt); } } } catch (IOException iox) { throw new TTransportException(iox.getMessage(), iox); } - return(orig_len - len); + return (orig_len - len); } /** * Event is corrupted. Do recovery * - * @return true if recovery could be performed and we can read more data - * false is returned only when nothing more can be read + * @return true if recovery could be performed and we can read more data false is returned only + * when nothing more can be read */ private boolean performRecovery() throws TTransportException { int numChunks = getNumChunks(); int curChunk = cs.getChunkNum(); - if(curChunk >= (numChunks-1)) { + if (curChunk >= (numChunks - 1)) { return false; } - seekToChunk(curChunk+1); + seekToChunk(curChunk + 1); return true; } @@ -301,27 +299,27 @@ public class TFileTransport extends TTransport { do { // corner case. read to end of chunk nrequested = cs.getRemaining(); - if(nrequested < 4) { + if (nrequested < 4) { nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_); - if(nread != nrequested) { - return(false); + if (nread != nrequested) { + return (false); } } // assuming serialized on little endian machine nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_); - if(nread != 4) { - return(false); + if (nread != 4) { + return (false); } - esize=0; - for(int i=3; i>=0; i--) { - int val = (0x000000ff & (int)ebytes[i]); - esize |= (val << (i*8)); + esize = 0; + for (int i = 3; i >= 0; i--) { + int val = (0x000000ff & (int) ebytes[i]); + esize |= (val << (i * 8)); } // check if event is corrupted and do recovery as required - if(esize > cs.getRemaining()) { + if (esize > cs.getRemaining()) { throw new TTransportException("FileTransport error: bad event size"); /* if(performRecovery()) { @@ -334,17 +332,16 @@ public class TFileTransport extends TTransport { } while (esize == 0); // reset existing event or get a larger one - if(currentEvent_.getSize() < esize) - currentEvent_ = new Event(new byte [esize]); + if (currentEvent_.getSize() < esize) currentEvent_ = new Event(new byte[esize]); // populate the event byte[] buf = currentEvent_.getBuf(); nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_); - if(nread != esize) { - return(false); + if (nread != esize) { + return (false); } currentEvent_.setAvailable(esize); - return(true); + return (true); } /** @@ -356,37 +353,31 @@ public class TFileTransport extends TTransport { return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null))); } - /** - * Diverging from the cpp model and sticking to the TSocket model - * Files are not opened in ctor - but in explicit open call + * Diverging from the cpp model and sticking to the TSocket model Files are not opened in ctor - + * but in explicit open call */ public void open() throws TTransportException { - if (isOpen()) - throw new TTransportException(TTransportException.ALREADY_OPEN); + if (isOpen()) throw new TTransportException(TTransportException.ALREADY_OPEN); try { inputStream_ = createInputStream(); cs = new ChunkState(); - currentEvent_ = new Event(new byte [256]); + currentEvent_ = new Event(new byte[256]); - if(!readOnly_) - outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream()); + if (!readOnly_) outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream()); } catch (IOException iox) { throw new TTransportException(TTransportException.NOT_OPEN, iox); } } - /** - * Closes the transport. - */ + /** Closes the transport. */ public void close() { if (inputFile_ != null) { try { inputFile_.close(); } catch (IOException iox) { - LOGGER.warn("WARNING: Error closing input file: " + - iox.getMessage()); + LOGGER.warn("WARNING: Error closing input file: " + iox.getMessage()); } inputFile_ = null; } @@ -394,14 +385,12 @@ public class TFileTransport extends TTransport { try { outputStream_.close(); } catch (IOException iox) { - LOGGER.warn("WARNING: Error closing output stream: " + - iox.getMessage()); + LOGGER.warn("WARNING: Error closing output stream: " + iox.getMessage()); } outputStream_ = null; } } - /** * File Transport ctor * @@ -425,30 +414,26 @@ public class TFileTransport extends TTransport { readOnly_ = readOnly; } - /** - * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception - * where one is detected. + * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception where one + * is detected. */ - public int readAll(byte[] buf, int off, int len) - throws TTransportException { + public int readAll(byte[] buf, int off, int len) throws TTransportException { int got = 0; int ret = 0; while (got < len) { - ret = read(buf, off+got, len-got); + ret = read(buf, off + got, len - got); if (ret < 0) { throw new TTransportException("Error in reading from file"); } - if(ret == 0) { - throw new TTransportException(TTransportException.END_OF_FILE, - "End of File reached"); + if (ret == 0) { + throw new TTransportException(TTransportException.END_OF_FILE, "End of File reached"); } got += ret; } return got; } - /** * Reads up to len bytes into buffer buf, starting at offset off. * @@ -459,13 +444,11 @@ public class TFileTransport extends TTransport { * @throws TTransportException if there was an error reading data */ public int read(byte[] buf, int off, int len) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, - "Must open before reading"); + if (!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before reading"); - if(currentEvent_.getRemaining() == 0) { - if(!readEvent()) - return(0); + if (currentEvent_.getRemaining() == 0) { + if (!readEvent()) return (0); } int nread = currentEvent_.emit(buf, off, len); @@ -473,15 +456,12 @@ public class TFileTransport extends TTransport { } public int getNumChunks() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, - "Must open before getNumChunks"); + if (!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getNumChunks"); try { long len = inputFile_.length(); - if(len == 0) - return 0; - else - return (((int)(len/cs.getChunkSize())) + 1); + if (len == 0) return 0; + else return (((int) (len / cs.getChunkSize())) + 1); } catch (IOException iox) { throw new TTransportException(iox.getMessage(), iox); @@ -489,18 +469,14 @@ public class TFileTransport extends TTransport { } public int getCurChunk() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, - "Must open before getCurChunk"); + if (!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getCurChunk"); return (cs.getChunkNum()); - } - public void seekToChunk(int chunk) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, - "Must open before seeking"); + if (!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); int numChunks = getNumChunks(); @@ -519,45 +495,47 @@ public class TFileTransport extends TTransport { chunk = 0; } - long eofOffset=0; + long eofOffset = 0; boolean seekToEnd = (chunk >= numChunks); - if(seekToEnd) { + if (seekToEnd) { chunk = chunk - 1; - try { eofOffset = inputFile_.length(); } - catch (IOException iox) {throw new TTransportException(iox.getMessage(), - iox);} + try { + eofOffset = inputFile_.length(); + } catch (IOException iox) { + throw new TTransportException(iox.getMessage(), iox); + } } - if(chunk*cs.getChunkSize() != cs.getOffset()) { - try { inputFile_.seek((long)chunk*cs.getChunkSize()); } - catch (IOException iox) { - throw new TTransportException("Seek to chunk " + - chunk + " " +iox.getMessage(), iox); + if (chunk * cs.getChunkSize() != cs.getOffset()) { + try { + inputFile_.seek((long) chunk * cs.getChunkSize()); + } catch (IOException iox) { + throw new TTransportException("Seek to chunk " + chunk + " " + iox.getMessage(), iox); } - cs.seek((long)chunk*cs.getChunkSize()); + cs.seek((long) chunk * cs.getChunkSize()); currentEvent_.setAvailable(0); inputStream_ = createInputStream(); } - if(seekToEnd) { + if (seekToEnd) { // waiting forever here - otherwise we can hit EOF and end up // having consumed partial data from the data stream. TailPolicy old = setTailPolicy(TailPolicy.WAIT_FOREVER); - while(cs.getOffset() < eofOffset) { readEvent(); } + while (cs.getOffset() < eofOffset) { + readEvent(); + } currentEvent_.setAvailable(0); setTailPolicy(old); } } public void seekToEnd() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, - "Must open before seeking"); + if (!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); seekToChunk(getNumChunks()); } - /** * Writes up to len bytes from the buffer. * @@ -579,36 +557,30 @@ public class TFileTransport extends TTransport { throw new TTransportException("Not Supported"); } - @Override public TConfiguration getConfiguration() { return null; } @Override - public void updateKnownMessageSize(long size) throws TTransportException { - - } + public void updateKnownMessageSize(long size) throws TTransportException {} @Override - public void checkReadBytesAvailable(long numBytes) throws TTransportException { - - } + public void checkReadBytesAvailable(long numBytes) throws TTransportException {} - /** - * test program - * - */ + /** test program */ public static void main(String[] args) throws Exception { int num_chunks = 10; - if((args.length < 1) || args[0].equals("--help") - || args[0].equals("-h") || args[0].equals("-?")) { + if ((args.length < 1) + || args[0].equals("--help") + || args[0].equals("-h") + || args[0].equals("-?")) { printUsage(); } - if(args.length > 1) { + if (args.length > 1) { try { num_chunks = Integer.parseInt(args[1]); } catch (Exception e) { @@ -619,15 +591,15 @@ public class TFileTransport extends TTransport { TFileTransport t = new TFileTransport(args[0], true); t.open(); - LOGGER.info("NumChunks="+t.getNumChunks()); + LOGGER.info("NumChunks=" + t.getNumChunks()); Random r = new Random(); - for(int j=0; j<num_chunks; j++) { + for (int j = 0; j < num_chunks; j++) { byte[] buf = new byte[4096]; - int cnum = r.nextInt(t.getNumChunks()-1); - LOGGER.info("Reading chunk "+cnum); + int cnum = r.nextInt(t.getNumChunks() - 1); + LOGGER.info("Reading chunk " + cnum); t.seekToChunk(cnum); - for(int i=0; i<4096; i++) { + for (int i = 0; i < 4096; i++) { t.read(buf, 0, 4096); } } @@ -638,5 +610,4 @@ public class TFileTransport extends TTransport { LOGGER.error(" (Opens and reads num_chunks chunks from file randomly)"); System.exit(1); } - } |