summaryrefslogtreecommitdiff
path: root/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java')
-rw-r--r--lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java339
1 files changed, 155 insertions, 184 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java
index 61b68d279..713d52103 100644
--- a/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java
+++ b/lib/java/src/main/java/org/apache/thrift/transport/TFileTransport.java
@@ -21,22 +21,21 @@ package org.apache.thrift.transport;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.IOException;
import java.util.Random;
-
import org.apache.thrift.TConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * FileTransport implementation of the TTransport interface.
- * Currently this is a straightforward port of the cpp implementation
+ * FileTransport implementation of the TTransport interface. Currently this is a straightforward
+ * port of the cpp implementation
*
- * It may make better sense to provide a basic stream access on top of the framed file format
- * The FileTransport can then be a user of this framed file format with some additional logic
- * for chunking.
+ * <p>It may make better sense to provide a basic stream access on top of the framed file format The
+ * FileTransport can then be a user of this framed file format with some additional logic for
+ * chunking.
*/
public class TFileTransport extends TTransport {
@@ -46,15 +45,16 @@ public class TFileTransport extends TTransport {
public void trunc() {
pos = count = 0;
}
+
public TruncableBufferedInputStream(InputStream in) {
super(in);
}
+
public TruncableBufferedInputStream(InputStream in, int size) {
super(in, size);
}
}
-
public static class Event {
private byte[] buf_;
private int nread_;
@@ -70,64 +70,81 @@ public class TFileTransport extends TTransport {
nread_ = navailable_ = 0;
}
- public byte[] getBuf() { return buf_;}
- public int getSize() { return buf_.length; }
+ public byte[] getBuf() {
+ return buf_;
+ }
+ public int getSize() {
+ return buf_.length;
+ }
- public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
- public int getRemaining() { return (navailable_ - nread_); }
+ public void setAvailable(int sz) {
+ nread_ = 0;
+ navailable_ = sz;
+ }
+
+ public int getRemaining() {
+ return (navailable_ - nread_);
+ }
public int emit(byte[] buf, int offset, int ndesired) {
- if((ndesired == 0) || (ndesired > getRemaining()))
- ndesired = getRemaining();
+ if ((ndesired == 0) || (ndesired > getRemaining())) ndesired = getRemaining();
- if(ndesired <= 0)
- return (ndesired);
+ if (ndesired <= 0) return (ndesired);
System.arraycopy(buf_, nread_, buf, offset, ndesired);
nread_ += ndesired;
- return(ndesired);
+ return (ndesired);
}
}
- public static class ChunkState {
- /**
- * Chunk Size. Must be same across all implementations
- */
+ public static class ChunkState {
+ /** Chunk Size. Must be same across all implementations */
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
private int chunk_size_ = DEFAULT_CHUNK_SIZE;
private long offset_ = 0;
public ChunkState() {}
- public ChunkState(int chunk_size) { chunk_size_ = chunk_size; }
- public void skip(int size) {offset_ += size; }
- public void seek(long offset) {offset_ = offset;}
+ public ChunkState(int chunk_size) {
+ chunk_size_ = chunk_size;
+ }
- public int getChunkSize() { return chunk_size_;}
- public int getChunkNum() { return ((int)(offset_/chunk_size_));}
- public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
- public long getOffset() { return (offset_);}
+ public void skip(int size) {
+ offset_ += size;
+ }
+
+ public void seek(long offset) {
+ offset_ = offset;
+ }
+
+ public int getChunkSize() {
+ return chunk_size_;
+ }
+
+ public int getChunkNum() {
+ return ((int) (offset_ / chunk_size_));
+ }
+
+ public int getRemaining() {
+ return (chunk_size_ - ((int) (offset_ % chunk_size_)));
+ }
+
+ public long getOffset() {
+ return (offset_);
+ }
}
public enum TailPolicy {
-
NOWAIT(0, 0),
- WAIT_FOREVER(500, -1);
+ WAIT_FOREVER(500, -1);
- /**
- * Time in milliseconds to sleep before next read
- * If 0, no sleep
- */
+ /** Time in milliseconds to sleep before next read If 0, no sleep */
public final int timeout_;
- /**
- * Number of retries before giving up
- * if 0, no retries
- * if -1, retry forever
- */
+ /** Number of retries before giving up if 0, no retries if -1, retry forever */
public final int retries_;
/**
@@ -136,48 +153,31 @@ public class TFileTransport extends TTransport {
* @param timeout sleep time for this particular policy
* @param retries number of retries
*/
-
TailPolicy(int timeout, int retries) {
timeout_ = timeout;
retries_ = retries;
}
}
- /**
- * Current tailing policy
- */
+ /** Current tailing policy */
TailPolicy currentPolicy_ = TailPolicy.NOWAIT;
-
- /**
- * Underlying file being read
- */
+ /** Underlying file being read */
protected TSeekableFile inputFile_ = null;
- /**
- * Underlying outputStream
- */
+ /** Underlying outputStream */
protected OutputStream outputStream_ = null;
-
- /**
- * Event currently read in
- */
+ /** Event currently read in */
Event currentEvent_ = null;
- /**
- * InputStream currently being used for reading
- */
+ /** InputStream currently being used for reading */
InputStream inputStream_ = null;
- /**
- * current Chunk state
- */
+ /** current Chunk state */
ChunkState cs = null;
- /**
- * is read only?
- */
+ /** is read only? */
private boolean readOnly_ = false;
/**
@@ -201,7 +201,6 @@ public class TFileTransport extends TTransport {
return (old);
}
-
/**
* Initialize read input stream
*
@@ -210,8 +209,8 @@ public class TFileTransport extends TTransport {
private InputStream createInputStream() throws TTransportException {
InputStream is;
try {
- if(inputStream_ != null) {
- ((TruncableBufferedInputStream)inputStream_).trunc();
+ if (inputStream_ != null) {
+ ((TruncableBufferedInputStream) inputStream_).trunc();
is = inputStream_;
} else {
is = new TruncableBufferedInputStream(inputFile_.getInputStream());
@@ -219,7 +218,7 @@ public class TFileTransport extends TTransport {
} catch (IOException iox) {
throw new TTransportException(iox.getMessage(), iox);
}
- return(is);
+ return (is);
}
/**
@@ -229,18 +228,17 @@ public class TFileTransport extends TTransport {
* @param buf Buffer to read into
* @param off Offset in buffer to read into
* @param len Number of bytes to read
- * @param tp policy to use if we hit EOF
- *
+ * @param tp policy to use if we hit EOF
* @return number of bytes read
*/
- private int tailRead(InputStream is, byte[] buf,
- int off, int len, TailPolicy tp) throws TTransportException {
+ private int tailRead(InputStream is, byte[] buf, int off, int len, TailPolicy tp)
+ throws TTransportException {
int orig_len = len;
try {
int retries = 0;
- while(len > 0) {
+ while (len > 0) {
int cnt = is.read(buf, off, len);
- if(cnt > 0) {
+ if (cnt > 0) {
off += cnt;
len -= cnt;
retries = 0;
@@ -249,40 +247,40 @@ public class TFileTransport extends TTransport {
// EOF
retries++;
- if((tp.retries_ != -1) && tp.retries_ < retries)
- return (orig_len - len);
+ if ((tp.retries_ != -1) && tp.retries_ < retries) return (orig_len - len);
- if(tp.timeout_ > 0) {
- try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
+ if (tp.timeout_ > 0) {
+ try {
+ Thread.sleep(tp.timeout_);
+ } catch (InterruptedException e) {
+ }
}
} else {
// either non-zero or -1 is what the contract says!
- throw new
- TTransportException("Unexpected return from InputStream.read = "
- + cnt);
+ throw new TTransportException("Unexpected return from InputStream.read = " + cnt);
}
}
} catch (IOException iox) {
throw new TTransportException(iox.getMessage(), iox);
}
- return(orig_len - len);
+ return (orig_len - len);
}
/**
* Event is corrupted. Do recovery
*
- * @return true if recovery could be performed and we can read more data
- * false is returned only when nothing more can be read
+ * @return true if recovery could be performed and we can read more data false is returned only
+ * when nothing more can be read
*/
private boolean performRecovery() throws TTransportException {
int numChunks = getNumChunks();
int curChunk = cs.getChunkNum();
- if(curChunk >= (numChunks-1)) {
+ if (curChunk >= (numChunks - 1)) {
return false;
}
- seekToChunk(curChunk+1);
+ seekToChunk(curChunk + 1);
return true;
}
@@ -301,27 +299,27 @@ public class TFileTransport extends TTransport {
do {
// corner case. read to end of chunk
nrequested = cs.getRemaining();
- if(nrequested < 4) {
+ if (nrequested < 4) {
nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
- if(nread != nrequested) {
- return(false);
+ if (nread != nrequested) {
+ return (false);
}
}
// assuming serialized on little endian machine
nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
- if(nread != 4) {
- return(false);
+ if (nread != 4) {
+ return (false);
}
- esize=0;
- for(int i=3; i>=0; i--) {
- int val = (0x000000ff & (int)ebytes[i]);
- esize |= (val << (i*8));
+ esize = 0;
+ for (int i = 3; i >= 0; i--) {
+ int val = (0x000000ff & (int) ebytes[i]);
+ esize |= (val << (i * 8));
}
// check if event is corrupted and do recovery as required
- if(esize > cs.getRemaining()) {
+ if (esize > cs.getRemaining()) {
throw new TTransportException("FileTransport error: bad event size");
/*
if(performRecovery()) {
@@ -334,17 +332,16 @@ public class TFileTransport extends TTransport {
} while (esize == 0);
// reset existing event or get a larger one
- if(currentEvent_.getSize() < esize)
- currentEvent_ = new Event(new byte [esize]);
+ if (currentEvent_.getSize() < esize) currentEvent_ = new Event(new byte[esize]);
// populate the event
byte[] buf = currentEvent_.getBuf();
nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
- if(nread != esize) {
- return(false);
+ if (nread != esize) {
+ return (false);
}
currentEvent_.setAvailable(esize);
- return(true);
+ return (true);
}
/**
@@ -356,37 +353,31 @@ public class TFileTransport extends TTransport {
return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
}
-
/**
- * Diverging from the cpp model and sticking to the TSocket model
- * Files are not opened in ctor - but in explicit open call
+ * Diverging from the cpp model and sticking to the TSocket model Files are not opened in ctor -
+ * but in explicit open call
*/
public void open() throws TTransportException {
- if (isOpen())
- throw new TTransportException(TTransportException.ALREADY_OPEN);
+ if (isOpen()) throw new TTransportException(TTransportException.ALREADY_OPEN);
try {
inputStream_ = createInputStream();
cs = new ChunkState();
- currentEvent_ = new Event(new byte [256]);
+ currentEvent_ = new Event(new byte[256]);
- if(!readOnly_)
- outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream());
+ if (!readOnly_) outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream());
} catch (IOException iox) {
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
- /**
- * Closes the transport.
- */
+ /** Closes the transport. */
public void close() {
if (inputFile_ != null) {
try {
inputFile_.close();
} catch (IOException iox) {
- LOGGER.warn("WARNING: Error closing input file: " +
- iox.getMessage());
+ LOGGER.warn("WARNING: Error closing input file: " + iox.getMessage());
}
inputFile_ = null;
}
@@ -394,14 +385,12 @@ public class TFileTransport extends TTransport {
try {
outputStream_.close();
} catch (IOException iox) {
- LOGGER.warn("WARNING: Error closing output stream: " +
- iox.getMessage());
+ LOGGER.warn("WARNING: Error closing output stream: " + iox.getMessage());
}
outputStream_ = null;
}
}
-
/**
* File Transport ctor
*
@@ -425,30 +414,26 @@ public class TFileTransport extends TTransport {
readOnly_ = readOnly;
}
-
/**
- * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
- * where one is detected.
+ * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception where one
+ * is detected.
*/
- public int readAll(byte[] buf, int off, int len)
- throws TTransportException {
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
int got = 0;
int ret = 0;
while (got < len) {
- ret = read(buf, off+got, len-got);
+ ret = read(buf, off + got, len - got);
if (ret < 0) {
throw new TTransportException("Error in reading from file");
}
- if(ret == 0) {
- throw new TTransportException(TTransportException.END_OF_FILE,
- "End of File reached");
+ if (ret == 0) {
+ throw new TTransportException(TTransportException.END_OF_FILE, "End of File reached");
}
got += ret;
}
return got;
}
-
/**
* Reads up to len bytes into buffer buf, starting at offset off.
*
@@ -459,13 +444,11 @@ public class TFileTransport extends TTransport {
* @throws TTransportException if there was an error reading data
*/
public int read(byte[] buf, int off, int len) throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
- "Must open before reading");
+ if (!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN, "Must open before reading");
- if(currentEvent_.getRemaining() == 0) {
- if(!readEvent())
- return(0);
+ if (currentEvent_.getRemaining() == 0) {
+ if (!readEvent()) return (0);
}
int nread = currentEvent_.emit(buf, off, len);
@@ -473,15 +456,12 @@ public class TFileTransport extends TTransport {
}
public int getNumChunks() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
- "Must open before getNumChunks");
+ if (!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getNumChunks");
try {
long len = inputFile_.length();
- if(len == 0)
- return 0;
- else
- return (((int)(len/cs.getChunkSize())) + 1);
+ if (len == 0) return 0;
+ else return (((int) (len / cs.getChunkSize())) + 1);
} catch (IOException iox) {
throw new TTransportException(iox.getMessage(), iox);
@@ -489,18 +469,14 @@ public class TFileTransport extends TTransport {
}
public int getCurChunk() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
- "Must open before getCurChunk");
+ if (!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getCurChunk");
return (cs.getChunkNum());
-
}
-
public void seekToChunk(int chunk) throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
- "Must open before seeking");
+ if (!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking");
int numChunks = getNumChunks();
@@ -519,45 +495,47 @@ public class TFileTransport extends TTransport {
chunk = 0;
}
- long eofOffset=0;
+ long eofOffset = 0;
boolean seekToEnd = (chunk >= numChunks);
- if(seekToEnd) {
+ if (seekToEnd) {
chunk = chunk - 1;
- try { eofOffset = inputFile_.length(); }
- catch (IOException iox) {throw new TTransportException(iox.getMessage(),
- iox);}
+ try {
+ eofOffset = inputFile_.length();
+ } catch (IOException iox) {
+ throw new TTransportException(iox.getMessage(), iox);
+ }
}
- if(chunk*cs.getChunkSize() != cs.getOffset()) {
- try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
- catch (IOException iox) {
- throw new TTransportException("Seek to chunk " +
- chunk + " " +iox.getMessage(), iox);
+ if (chunk * cs.getChunkSize() != cs.getOffset()) {
+ try {
+ inputFile_.seek((long) chunk * cs.getChunkSize());
+ } catch (IOException iox) {
+ throw new TTransportException("Seek to chunk " + chunk + " " + iox.getMessage(), iox);
}
- cs.seek((long)chunk*cs.getChunkSize());
+ cs.seek((long) chunk * cs.getChunkSize());
currentEvent_.setAvailable(0);
inputStream_ = createInputStream();
}
- if(seekToEnd) {
+ if (seekToEnd) {
// waiting forever here - otherwise we can hit EOF and end up
// having consumed partial data from the data stream.
TailPolicy old = setTailPolicy(TailPolicy.WAIT_FOREVER);
- while(cs.getOffset() < eofOffset) { readEvent(); }
+ while (cs.getOffset() < eofOffset) {
+ readEvent();
+ }
currentEvent_.setAvailable(0);
setTailPolicy(old);
}
}
public void seekToEnd() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
- "Must open before seeking");
+ if (!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking");
seekToChunk(getNumChunks());
}
-
/**
* Writes up to len bytes from the buffer.
*
@@ -579,36 +557,30 @@ public class TFileTransport extends TTransport {
throw new TTransportException("Not Supported");
}
-
@Override
public TConfiguration getConfiguration() {
return null;
}
@Override
- public void updateKnownMessageSize(long size) throws TTransportException {
-
- }
+ public void updateKnownMessageSize(long size) throws TTransportException {}
@Override
- public void checkReadBytesAvailable(long numBytes) throws TTransportException {
-
- }
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
- /**
- * test program
- *
- */
+ /** test program */
public static void main(String[] args) throws Exception {
int num_chunks = 10;
- if((args.length < 1) || args[0].equals("--help")
- || args[0].equals("-h") || args[0].equals("-?")) {
+ if ((args.length < 1)
+ || args[0].equals("--help")
+ || args[0].equals("-h")
+ || args[0].equals("-?")) {
printUsage();
}
- if(args.length > 1) {
+ if (args.length > 1) {
try {
num_chunks = Integer.parseInt(args[1]);
} catch (Exception e) {
@@ -619,15 +591,15 @@ public class TFileTransport extends TTransport {
TFileTransport t = new TFileTransport(args[0], true);
t.open();
- LOGGER.info("NumChunks="+t.getNumChunks());
+ LOGGER.info("NumChunks=" + t.getNumChunks());
Random r = new Random();
- for(int j=0; j<num_chunks; j++) {
+ for (int j = 0; j < num_chunks; j++) {
byte[] buf = new byte[4096];
- int cnum = r.nextInt(t.getNumChunks()-1);
- LOGGER.info("Reading chunk "+cnum);
+ int cnum = r.nextInt(t.getNumChunks() - 1);
+ LOGGER.info("Reading chunk " + cnum);
t.seekToChunk(cnum);
- for(int i=0; i<4096; i++) {
+ for (int i = 0; i < 4096; i++) {
t.read(buf, 0, 4096);
}
}
@@ -638,5 +610,4 @@ public class TFileTransport extends TTransport {
LOGGER.error(" (Opens and reads num_chunks chunks from file randomly)");
System.exit(1);
}
-
}