Index: src/core/org/apache/hadoop/util/LineReader.java
===================================================================
--- src/core/org/apache/hadoop/util/LineReader.java (revision 760378)
+++ src/core/org/apache/hadoop/util/LineReader.java (working copy)
@@ -186,5 +186,9 @@
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
+
+ public long getBufferedDataLength(){
+ return (bufferPosn >= bufferLength? 0: bufferLength - bufferPosn);
+ }
}
Index: src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (revision 760378)
+++ src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (working copy)
@@ -20,15 +20,18 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplitEnabledCompressionCodec;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -46,8 +49,14 @@
private long start;
private long pos;
private long end;
- private LineReader in;
+ private LineReader lineReader;
int maxLineLength;
+ private InputStream input = null;
+ private FSDataInputStream fileIn;
+ private enum INPUT_TYPE {SPLITABLE_COMPRESSED,
+ NONSPLITABLE_COMPRESSED,
+ PLAIN};
+ private INPUT_TYPE inputType = INPUT_TYPE.PLAIN;
/**
* A class that provides a line reader from an input stream.
@@ -78,28 +87,65 @@
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
+ fileIn = fs.open(split.getPath());
+ fileIn.seek(start);
+
+ this.setInputType(codec, job);
+
+ switch(this.inputType){
+ case NONSPLITABLE_COMPRESSED:
+
+ this.input = codec.createInputStream(fileIn);
+ lineReader = new LineReader(this.input, job);
end = Long.MAX_VALUE;
- } else {
- fileIn.seek(start);
- in = new LineReader(fileIn, job);
+ break;
+
+
+ case SPLITABLE_COMPRESSED:
+
+ Vector
+ * This Ant code was enhanced so that it can de-compress blocks of bzip2 data. + * Current position in the stream is an important statistic for Hadoop. For + * example in LineRecordReader, we solely depend on the current position in the + * stream to know about the progess. The notion of position becomes complicated + * for compressed files. The Hadoop splitting is done in terms of compressed + * file. But a compressed file deflates to a large amount of data. So we have + * handled this problem in the following way. + * + * On object creation time, we find the next block start delimiter. Once such a + * marker is found, the stream stops there (we discard any read compressed data + * in this process) and the position is updated (i.e. the caller of this class + * will find out the stream location). At this point we are ready for actual + * reading (i.e. decompression) of data. + * + * The subsequent read calls give out data. The position is updated when the + * caller of this class has read off the current block + 1 bytes. In between the + * block reading, position is not updated. (We can only update the postion on + * block boundaries). + *
+ * + ** Instances of this class are not threadsafe. *
*/ public class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void reportCRCError() throws IOException { - throw new IOException("BZip2 CRC error"); + public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block + public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 stream + private static final int DELIMITER_BIT_LENGTH = 48; + READ_MODE readMode = READ_MODE.CONTINUOUS; + // The variable records the current advertised position of the stream. + private long reportedBytesReadFromCompressedStream = 0L; + // The following variable keep record of compressed bytes read. + private long bytesReadFromCompressedStream = 0L; - } - - private void makeMaps() { - final boolean[] inUse = this.data.inUse; - final byte[] seqToUnseq = this.data.seqToUnseq; - - int nInUseShadow = 0; - - for (int i = 0; i < 256; i++) { - if (inUse[i]) - seqToUnseq[nInUseShadow++] = (byte) i; - } - - this.nInUse = nInUseShadow; - } - /** * Index of the last char in the block, so the block size == last + 1. */ @@ -86,32 +101,30 @@ */ private int blockSize100k; - private boolean blockRandomised; + private boolean blockRandomised = false; - private int bsBuff; - private int bsLive; + private long bsBuff; + private long bsLive; private final CRC crc = new CRC(); private int nInUse; - private InputStream in; + private BufferedInputStream in; private int currentChar = -1; - private static final int EOF = 0; - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; + //A state machine to keep track of current state of the de-coder + public enum STATE { + EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE + }; - private int currentState = START_BLOCK_STATE; + private STATE currentState = STATE.START_BLOCK_STATE; private int storedBlockCRC, storedCombinedCRC; private int computedBlockCRC, computedCombinedCRC; + private boolean skipResult = false;// used by skipToNextMarker + // Variables used by setup* methods exclusively private int su_count; @@ -130,6 +143,104 @@ private CBZip2InputStream.Data data; /** + * This method reports the processed bytes so far. Please note that this + * statistic is only updated on block boundaries and only when the stream is + * initiated in BYBLOCK mode. + */ + public long getProcessedByteCount() { + return reportedBytesReadFromCompressedStream; + } + + public void updateProcessedByteCount(int count) { + this.bytesReadFromCompressedStream += count; + } + + /* + * This method is called by the client of this + * class in case there are any corrections in + * the stream position. One common example is + * when client of this code removes starting BZ + * characters from the compressed stream. * + * + */ + public void updateReportedByteCount(int count) { + this.reportedBytesReadFromCompressedStream += count; + this.updateProcessedByteCount(count); + } + + /** + * This method reads a Byte from the compressed stream. Whenever we need to + * read from the underlying compressed stream, this method should be called + * instead of directly calling the read method of the underlying compressed + * stream. This method does important record keeping to have the statistic + * that how many bytes have been read off the compressed stream. + */ + private int readAByte(InputStream inStream) throws IOException { + int read = inStream.read(); + if (read >= 0) { + this.updateProcessedByteCount(1); + } + return read; + } + + /** + * This method tries to find the end of block (EOB) delimiter in the stream, + * starting from the current position of the stream. If EOB is found, the + * stream position will be right after EOB. + * + * @throws Exception + */ + public boolean skipToNextMarker(long marker, int markerBitLength) + throws Exception { + try { + if (markerBitLength > 63) { + throw new Exception( + "skipToNextMarker can not find patterns greater than 63 bits"); + } + // pick next marketBitLength bits in the stream + long bytes = 0; + bytes = this.bsR(markerBitLength); + if (bytes == -1) { + return false; + } + while (true) { + if (bytes == marker) { + return true; + + } else { + bytes = bytes << 1; + bytes = bytes & ((1L << markerBitLength) - 1); + int oneBit = (int) this.bsR(1); + if (oneBit != -1) { + bytes = bytes | oneBit; + } else + return false; + } + } + } catch (IOException ex) { + return false; + } + } + + private static void reportCRCError() throws IOException { + throw new IOException("crc error"); + } + + private void makeMaps() { + final boolean[] inUse = this.data.inUse; + final byte[] seqToUnseq = this.data.seqToUnseq; + + int nInUseShadow = 0; + + for (int i = 0; i < 256; i++) { + if (inUse[i]) + seqToUnseq[nInUseShadow++] = (byte) i; + } + + this.nInUse = nInUseShadow; + } + + /** * Constructs a new CBZip2InputStream which decompresses bytes read from the * specified stream. * @@ -145,21 +256,77 @@ * @throws NullPointerException * if in == null */ - public CBZip2InputStream(final InputStream in) throws IOException { + public CBZip2InputStream(final InputStream in, READ_MODE readMode) + throws IOException { + super(); + int blockSize = 0X39;// i.e 9 + this.blockSize100k = blockSize - '0'; + this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer + this.readMode = readMode; + if (readMode == READ_MODE.CONTINUOUS) { + currentState = STATE.START_BLOCK_STATE; + init(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + try { + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + changeStateToProcessABlock(); + } catch (Exception e) { + } + } + } - this.in = in; - init(); + public CBZip2InputStream(final InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); } + private void changeStateToProcessABlock() throws IOException { + if (skipResult == true) { + initBlock(); + setupBlock(); + } else { + this.currentState = STATE.EOF; + } + } + + public int read() throws IOException { + if (this.in != null) { - return read0(); + byte array[] = new byte[1]; + int result = this.read(array, 0, 1); + int value = 0X000000FF & array[0]; + return (result > 0 ? value : result); + } else { throw new IOException("stream closed"); } } + /** + * In CONTINOUS reading mode, this read method starts from the + * start of the compressed stream and end at the end of file by + * emitting un-compressed data. In this mode stream positioning + * is not announced and should be ignored. + * + * In BYBLOCK reading mode, this read method informs about the end + * of a BZip2 block by returning EOB. At this event, the compressed + * stream position is also announced. This announcement tells that + * how much of the compressed stream has been de-compressed and read + * out of this class. In between EOB events, the stream position is + * not updated. + * + * + * @throws IOException + * if the stream content is malformed or an I/O error occurs. + * + * @return int The return value greater than 0 are the bytes read. A value + * of -1 means end of stream while -2 represents end of block + */ + + public int read(final byte[] dest, final int offs, final int len) throws IOException { if (offs < 0) { @@ -178,11 +345,31 @@ final int hi = offs + len; int destOffs = offs; - for (int b; (destOffs < hi) && ((b = read0()) >= 0);) { + int b = 0; + + + + for (; ((destOffs < hi) && ((b = read0())) >= 0);) { dest[destOffs++] = (byte) b; + } - return (destOffs == offs) ? -1 : (destOffs - offs); + int result = destOffs - offs; + if (result == 0) { + //report 'end of block' or 'end of stream' + result = b; + + + try { + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + //Exactly when we are about to start a new block, we advertise the stream position. + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + + } catch (Exception e) {} + + changeStateToProcessABlock(); + } + return result; } private int read0() throws IOException { @@ -190,8 +377,11 @@ switch (this.currentState) { case EOF: - return -1; + return END_OF_STREAM;// return -1 + case NO_PROCESS_STATE: + return END_OF_BLOCK;// return -2 + case START_BLOCK_STATE: throw new IllegalStateException(); @@ -225,13 +415,13 @@ } private void init() throws IOException { - int magic2 = this.in.read(); + int magic2 = this.readAByte(in); if (magic2 != 'h') { throw new IOException("Stream is not BZip2 formatted: expected 'h'" + " as first byte but got '" + (char) magic2 + "'"); } - int blockSize = this.in.read(); + int blockSize = this.readAByte(in); if ((blockSize < '1') || (blockSize > '9')) { throw new IOException("Stream is not BZip2 formatted: illegal " + "blocksize " + (char) blockSize); @@ -244,6 +434,27 @@ } private void initBlock() throws IOException { + if (this.readMode == READ_MODE.BYBLOCK) { + // this.checkBlockIntegrity(); + this.storedBlockCRC = bsGetInt(); + this.blockRandomised = bsR(1) == 1; + + /** + * Allocate data here instead in constructor, so we do not allocate + * it if the input file is empty. + */ + if (this.data == null) { + this.data = new Data(this.blockSize100k); + } + + // currBlockNo++; + getAndMoveToFrontDecode(); + + this.crc.initialiseCRC(); + this.currentState = STATE.START_BLOCK_STATE; + return; + } + char magic0 = bsGetUByte(); char magic1 = bsGetUByte(); char magic2 = bsGetUByte(); @@ -261,7 +472,7 @@ magic4 != 0x53 || // 'S' magic5 != 0x59 // 'Y' ) { - this.currentState = EOF; + this.currentState = STATE.EOF; throw new IOException("bad block header"); } else { this.storedBlockCRC = bsGetInt(); @@ -279,7 +490,7 @@ getAndMoveToFrontDecode(); this.crc.initialiseCRC(); - this.currentState = START_BLOCK_STATE; + this.currentState = STATE.START_BLOCK_STATE; } } @@ -295,6 +506,7 @@ this.computedCombinedCRC ^= this.storedBlockCRC; reportCRCError(); + } this.computedCombinedCRC = (this.computedCombinedCRC << 1) @@ -304,7 +516,7 @@ private void complete() throws IOException { this.storedCombinedCRC = bsGetInt(); - this.currentState = EOF; + this.currentState = STATE.EOF; this.data = null; if (this.storedCombinedCRC != this.computedCombinedCRC) { @@ -326,14 +538,14 @@ } } - private int bsR(final int n) throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + private long bsR(final long n) throws IOException { + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < n) { final InputStream inShadow = this.in; do { - int thech = inShadow.read(); + int thech = readAByte(inShadow); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -347,15 +559,16 @@ } this.bsLive = bsLiveShadow - n; - return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1); + final long one = 1; + return (bsBuffShadow >> (bsLiveShadow - n)) & ((one << n) - 1); } private boolean bsGetBit() throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < 1) { - int thech = this.in.read(); + int thech = this.readAByte(in); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -375,7 +588,7 @@ } private int bsGetInt() throws IOException { - return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8); + return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8)); } /** @@ -454,8 +667,8 @@ final int alphaSize = this.nInUse + 2; /* Now the selectors */ - final int nGroups = bsR(3); - final int nSelectors = bsR(15); + final int nGroups = (int) bsR(3); + final int nSelectors = (int) bsR(15); for (int i = 0; i < nSelectors; i++) { int j = 0; @@ -486,7 +699,7 @@ /* Now the coding tables */ for (int t = 0; t < nGroups; t++) { - int curr = bsR(5); + int curr = (int) bsR(5); final char[] len_t = len[t]; for (int i = 0; i < alphaSize; i++) { while (bsGetBit()) { @@ -532,7 +745,7 @@ } private void getAndMoveToFrontDecode() throws IOException { - this.origPtr = bsR(24); + this.origPtr = (int) bsR(24); recvDecodingTables(); final InputStream inShadow = this.in; @@ -562,8 +775,8 @@ int groupPos = G_SIZE - 1; final int eob = this.nInUse + 1; int nextSym = getAndMoveToFrontDecode0(0); - int bsBuffShadow = this.bsBuff; - int bsLiveShadow = this.bsLive; + int bsBuffShadow = (int) this.bsBuff; + int bsLiveShadow = (int) this.bsLive; int lastShadow = -1; int zt = selector[groupNo] & 0xff; int[] base_zt = base[zt]; @@ -597,10 +810,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -609,14 +820,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + long zvec = (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -630,7 +841,7 @@ zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); } - nextSym = perm_zt[zvec - base_zt[zn]]; + nextSym = perm_zt[(int) (zvec - base_zt[zn])]; } final byte ch = seqToUnseq[yy[0]]; @@ -680,10 +891,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -692,14 +901,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + int zvec = (int) (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -709,7 +918,7 @@ } } bsLiveShadow--; - zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); + zvec = (int) ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1)); } nextSym = perm_zt[zvec - base_zt[zn]]; } @@ -726,14 +935,14 @@ final int zt = dataShadow.selector[groupNo] & 0xff; final int[] limit_zt = dataShadow.limit[zt]; int zn = dataShadow.minLens[zt]; - int zvec = bsR(zn); - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + int zvec = (int) bsR(zn); + int bsLiveShadow = (int) this.bsLive; + int bsBuffShadow = (int) this.bsBuff; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; @@ -807,12 +1016,16 @@ this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = RAND_PART_B_STATE; + this.currentState = STATE.RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } @@ -824,19 +1037,23 @@ this.su_tPos = this.data.tt[this.su_tPos]; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = NO_RAND_PART_B_STATE; + this.currentState = STATE.NO_RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { - this.currentState = NO_RAND_PART_A_STATE; + this.currentState = STATE.NO_RAND_PART_A_STATE; endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } private void setupRandPartB() throws IOException { if (this.su_ch2 != this.su_chPrev) { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_count = 1; setupRandPartA(); } else if (++this.su_count >= 4) { @@ -851,13 +1068,13 @@ this.su_rNToGo--; } this.su_j2 = 0; - this.currentState = RAND_PART_C_STATE; + this.currentState = STATE.RAND_PART_C_STATE; if (this.su_rNToGo == 1) { this.su_z ^= 1; } setupRandPartC(); } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; setupRandPartA(); } } @@ -868,7 +1085,7 @@ this.crc.updateCRC(this.su_ch2); this.su_j2++; } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_i2++; this.su_count = 0; setupRandPartA(); @@ -895,7 +1112,7 @@ this.currentChar = su_ch2Shadow; this.crc.updateCRC(su_ch2Shadow); this.su_j2++; - this.currentState = NO_RAND_PART_C_STATE; + this.currentState = STATE.NO_RAND_PART_C_STATE; } else { this.su_i2++; this.su_count = 0; Index: src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java (revision 0) +++ src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java (revision 0) @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Vector; + +import org.apache.hadoop.conf.Configuration; + + +/** + * This interface is meant to be implemented by those compression codecs + * which are capable to compress / de-compress a stream starting at any + * arbitrary position. + * + * Especially the process of de-compressing a stream starting at some arbitrary + * position is challenging. Most of the codecs are only able to successfully + * de-compress a stream, if they start from the very beginning till the end. + * One of the reasons is the stored state at the beginning of the stream which + * is crucial for de-compression. + * + * Yet there are few codecs which do not save the whole state at the beginning + * of the stream and hence can be used to de-compress stream starting at any + * arbitrary points. This interface is meant to be used by such codecs. Such + * codecs are highly valuable, especially in the context of Hadoop, because + * an input compressed file can be split and hence can be worked on by multiple + * machines in parallel. + * @since Hadoop Release 0.19.2 + */ + +public interface SplitEnabledCompressionCodec extends CompressionCodec { + + /** + * During decompression, data can be read off from the decompressor in + * two modes, namely continuous and blocked. Few codecs (e.g. BZip2) are capable of + * compressing data in blocks and then decompressing the blocks. In Blocked reading + * mode codecs inform 'end of block' events to its caller. While in continuous mode, + * the caller of codecs is unaware about the blocks and uncompressed data is spilled + * out like a continuous stream. + */ + public enum READ_MODE {CONTINUOUS, BYBLOCK}; + + + /** + * Create a stream decompressor that will read from the given input stream. The + * mode of reading is dictated by readMode. + * + * @param in the stream to read compressed bytes from + * @param readMode Read mode could be either Continuous or Blocked + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + CompressionInputStream createInputStream(InputStream in, READ_MODE readMode) throws IOException; + + + /** + * This method gives codecs an opportunity to change or shift upper and lower + * ends of a split, if needed by the codecs. The first entry of the vector + * of Integer is the start of split while the second Integer is the end of + * boundary limit. Codecs might change one or both of these limits. It is + * expected that, codecs will return true if either of the two limits are + * changed. This is to signal the caller of this method that eithr of those + * limits have changed. + * @param splitLimits This vector has two entries. Split start and end limit + * @return true if either of the limits are changed + */ + boolean adjustSplitBounds(Vector