Index: src/core/org/apache/hadoop/util/LineReader.java =================================================================== --- src/core/org/apache/hadoop/util/LineReader.java (revision 760378) +++ src/core/org/apache/hadoop/util/LineReader.java (working copy) @@ -186,5 +186,9 @@ public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } + + public long getBufferedDataLength(){ + return (bufferPosn >= bufferLength? 0: bufferLength - bufferPosn); + } } Index: src/mapred/org/apache/hadoop/mapred/LineRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (revision 760378) +++ src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (working copy) @@ -20,15 +20,18 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.SplitEnabledCompressionCodec; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -46,8 +49,14 @@ private long start; private long pos; private long end; - private LineReader in; + private LineReader lineReader; int maxLineLength; + private InputStream input = null; + private FSDataInputStream fileIn; + private enum INPUT_TYPE {SPLITABLE_COMPRESSED, + NONSPLITABLE_COMPRESSED, + PLAIN}; + private INPUT_TYPE inputType = INPUT_TYPE.PLAIN; /** * A class that provides a line reader from an input stream. @@ -78,28 +87,65 @@ // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); - FSDataInputStream fileIn = fs.open(split.getPath()); - if (codec != null) { - in = new LineReader(codec.createInputStream(fileIn), job); + fileIn = fs.open(split.getPath()); + fileIn.seek(start); + + this.setInputType(codec, job); + + switch(this.inputType){ + case NONSPLITABLE_COMPRESSED: + + this.input = codec.createInputStream(fileIn); + lineReader = new LineReader(this.input, job); end = Long.MAX_VALUE; - } else { - fileIn.seek(start); - in = new LineReader(fileIn, job); + break; + + + case SPLITABLE_COMPRESSED: + + Vector splitLimits = new Vector(); + splitLimits.add(0, start); + splitLimits.add(1, end); + + if(((SplitEnabledCompressionCodec)codec).adjustSplitBounds(splitLimits)){ + //Codecs have changed start / end. So we need to update them. + start = splitLimits.get(0); + end = splitLimits.get(1); + fileIn.seek(start); + } + + this.input = ((SplitEnabledCompressionCodec)codec).createInputStream(fileIn, SplitEnabledCompressionCodec.READ_MODE.BYBLOCK); + lineReader = new LineReader(this.input, job); + break; + + + case PLAIN: + + input = new org.apache.hadoop.fs.FSDataInputStream(fileIn) { + + public long getPos() throws IOException{ + return ((Seekable)this.in).getPos() - lineReader.getBufferedDataLength(); + } + + }; + + lineReader = new LineReader(this.input, job); + break; } + + // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { - start += in.readLine(new Text(), 0, (int) Math.min( - (long) Integer.MAX_VALUE, end - start)); + lineReader.readLine(new Text(), maxLineLength, Integer.MAX_VALUE); } - this.pos = start; } - + public LineRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; - this.in = new LineReader(in); + this.lineReader = new LineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; @@ -110,7 +156,7 @@ throws IOException{ this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); - this.in = new LineReader(in, job); + this.lineReader = new LineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; @@ -130,16 +176,15 @@ // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) + pos = this.getPos(); + while (pos <= end) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), - maxLineLength)); + int newSize = lineReader.readLine(value, maxLineLength, Integer.MAX_VALUE); if (newSize == 0) { return false; } - pos += newSize; if (newSize < maxLineLength) { return true; } @@ -154,21 +199,37 @@ /** * Get the progress within the split */ - public float getProgress() { + public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { + pos = this.getPos(); return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized long getPos() throws IOException { - return pos; + return ((Seekable)input).getPos(); } public synchronized void close() throws IOException { - if (in != null) { - in.close(); + if (lineReader != null) { + lineReader.close(); } } + private void setInputType(CompressionCodec codec, Configuration jobConf){ + if(codec == null){ + this.inputType = INPUT_TYPE.PLAIN; + } + else if(!(codec instanceof SplitEnabledCompressionCodec)){ + this.inputType = INPUT_TYPE.NONSPLITABLE_COMPRESSED; + } + else if(((SplitEnabledCompressionCodec)codec).shouldSplitInput(jobConf)){ + this.inputType = INPUT_TYPE.SPLITABLE_COMPRESSED; + } + else{ + this.inputType = INPUT_TYPE.NONSPLITABLE_COMPRESSED; + } + + } } Index: src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (revision 760378) +++ src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (working copy) @@ -100,8 +100,8 @@ } return true; } - - public float getProgress() { + + public float getProgress() throws IOException { return lineRecordReader.getProgress(); } Index: src/test/org/apache/hadoop/io/compress/TestCodec.java =================================================================== --- src/test/org/apache/hadoop/io/compress/TestCodec.java (revision 760378) +++ src/test/org/apache/hadoop/io/compress/TestCodec.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.SplitEnabledCompressionCodec.READ_MODE; import org.apache.hadoop.io.compress.zlib.ZlibFactory; public class TestCodec extends TestCase { @@ -59,9 +60,9 @@ public void testGzipCodec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); } - - public void testBZip2Codec() throws IOException { - codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); + + public void testBZip2Codec() throws IOException { + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); } private static void codecTest(Configuration conf, int seed, int count, @@ -110,9 +111,12 @@ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); - CompressionInputStream inflateFilter = - codec.createInputStream(deCompressedDataBuffer); - DataInputStream inflateIn = + CompressionInputStream inflateFilter = null; + inflateFilter = ((codecClass.compareTo("org.apache.hadoop.io.compress.BZip2Codec")==0) + ?((SplitEnabledCompressionCodec)codec).createInputStream(deCompressedDataBuffer, READ_MODE.BYBLOCK) + :codec.createInputStream(deCompressedDataBuffer)); + + DataInputStream inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); // Check Index: src/core/org/apache/hadoop/io/compress/CompressionInputStream.java =================================================================== --- src/core/org/apache/hadoop/io/compress/CompressionInputStream.java (revision 760378) +++ src/core/org/apache/hadoop/io/compress/CompressionInputStream.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; /** * A compression input stream. * @@ -28,11 +30,13 @@ * reposition the underlying input stream then call {@link #resetState()}, * without having to also synchronize client buffers. */ -public abstract class CompressionInputStream extends InputStream { + +public abstract class CompressionInputStream extends InputStream implements Seekable { /** * The input stream to be compressed. */ protected final InputStream in; + protected long maxAvailableData = 0L; /** * Create a compression input stream that reads @@ -41,6 +45,12 @@ * @param in The input stream to be compressed. */ protected CompressionInputStream(InputStream in) { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + try { + this.maxAvailableData = in.available(); + } catch (IOException e) { + } + } this.in = in; } @@ -59,5 +69,25 @@ * as the underlying stream may have been repositioned. */ public abstract void resetState() throws IOException; - + + public long getPos() throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){ + //This way of getting the current position will not work for file + //size which can be fit in an int and hence can not be returned by + //available method. + return (this.maxAvailableData - this.in.available()); + } + else{ + return ((Seekable)this.in).getPos(); + } + + } + + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException(); + } + + public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException(); + } } Index: src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java =================================================================== --- src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java (revision 760378) +++ src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java (working copy) @@ -44,6 +44,8 @@ int N_ITERS = 4; int MAX_SELECTORS = (2 + (900000 / G_SIZE)); int NUM_OVERSHOOT_BYTES = 20; + public static final int END_OF_BLOCK = -2; + public static final int END_OF_STREAM = -1; /** * This array really shouldn't be here. Again, for historical purposes it Index: src/core/org/apache/hadoop/io/compress/BZip2Codec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/BZip2Codec.java (revision 760378) +++ src/core/org/apache/hadoop/io/compress/BZip2Codec.java (working copy) @@ -22,7 +22,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Vector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.bzip2.BZip2Constants; import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor; import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor; import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; @@ -36,10 +39,13 @@ * UnsupportedOperationException. */ public class BZip2Codec implements - org.apache.hadoop.io.compress.CompressionCodec { + org.apache.hadoop.io.compress.SplitEnabledCompressionCodec { private static final String HEADER = "BZ"; private static final int HEADER_LEN = HEADER.length(); + private static final String SUB_HEADER = "h9"; + private static final int SUB_HEADER_LEN = SUB_HEADER.length(); + private static final int BZIP2_BLOCK_MARKER_LENGTH = 6; /** * Creates a new instance of BZip2Codec @@ -62,11 +68,11 @@ } /** - * This functionality is currently not supported. - * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException - */ + * Creates a compressor using given OutputStream. + * + * @return CompressionOutputStream + @throws java.io.IOException + */ public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { return createOutputStream(out); @@ -118,6 +124,26 @@ } /** + * Creates CompressionInputStream to be used to read off uncompressed data + * in one of the two reading modes. i.e. Continuous or Blocked reading modes + * + * @param in + * The InputStream + * @param readMode + * The reading mode + * + * @return Returns CompressionInputStream for BZip2 + * @throws java.io.IOException + * Throws IOException + */ + public CompressionInputStream createInputStream(InputStream in, + READ_MODE readMode) throws IOException { + CompressionInputStream compressionInputStream = null; + compressionInputStream = new BZip2CompressionInputStream(in, readMode); + return compressionInputStream; + } + + /** * This functionality is currently not supported. * * @throws java.lang.UnsupportedOperationException @@ -146,8 +172,39 @@ return ".bz2"; } - private static class BZip2CompressionOutputStream extends CompressionOutputStream { + /** + * BZip2 codecs must pull back 6 bytes (i.e. the number of bytes in a BZip2 + * marker to handle the cases when split start is exactly on the marker. + * This method is only meaningful when codecs are used in BY_BLOCK mode. * + * + */ + public boolean adjustSplitBounds(Vector splitLimits) { + long start = splitLimits.get(0); + if (start >= BZIP2_BLOCK_MARKER_LENGTH) { + start -= BZIP2_BLOCK_MARKER_LENGTH; + } + + // Since value of start is changed,put new val in splitLimits + splitLimits.set(0, start); + return true; + } + + /** + * bzip2 can work with Hadoop generated file splits. For that bzip2 codecs + * must be used in Blocked mode while creating CompressionInputStream + * + * @return boolean true + */ + public boolean shouldSplitInput(Configuration jobConf) { + String jobConfSplitSwitch = jobConf.get("mapred.input.codec.nosplitting"); + if (jobConfSplitSwitch == null) jobConfSplitSwitch = ""; + return (jobConfSplitSwitch.compareToIgnoreCase("true") == 0)?false:true; + } + + private static class BZip2CompressionOutputStream extends + CompressionOutputStream { + // class data starts here// private CBZip2OutputStream output; private boolean needsReset; @@ -209,26 +266,79 @@ }// end of class BZip2CompressionOutputStream - private static class BZip2CompressionInputStream extends CompressionInputStream { + /** + * This class is capable to de-compress BZip2 data in two modes; + * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to + * do decompression starting any arbitrary position in the stream. + * + * So this facility can easily be used to parallelize decompression + * of a large BZip2 file for performance reasons. (It is exactly + * done so for Hadoop framework. See LineRecordReader for an + * example). So one can break the file (of course logically) into + * chunks for parallel processing. These "splits" should be like + * default Hadoop splits (e.g as in FileInputFormat getSplit metod). + * So this code is designed and tested for FileInputFormat's way + * of splitting only. + */ + private static class BZip2CompressionInputStream extends + CompressionInputStream { + // class data starts here// private CBZip2InputStream input; boolean needsReset; + private BufferedInputStream bufferedIn; + private boolean isHeaderStripped = false; + private boolean isSubHeaderStripped = false; + private READ_MODE readMode = READ_MODE.CONTINUOUS; + private long startingPos = 0L; + + // Following state machine handles different states of compressed stream + // position + // HOLD : Don't advertise compressed stream position + // ADVERTISE : Read 1 more character and advertise stream position + // See more comments about it before updatePos method. + private enum POS_ADVERTISEMENT_STATE_MACHINE { + HOLD, ADVERTISE + }; + + POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; + long compressedStreamPosition = 0; + // class data ends here// public BZip2CompressionInputStream(InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); + } + public BZip2CompressionInputStream(InputStream in, READ_MODE readMode) + throws IOException { super(in); - needsReset = true; + needsReset = false; + bufferedIn = new BufferedInputStream(super.in); + this.startingPos = super.getPos(); + this.readMode = readMode; + if (this.startingPos == 0) { + // We only strip header if it is start of file + bufferedIn = readStreamHeader(); + } + input = new CBZip2InputStream(bufferedIn, readMode); + if (this.isHeaderStripped) { + input.updateReportedByteCount(HEADER_LEN); + } + + if (this.isSubHeaderStripped) { + input.updateReportedByteCount(SUB_HEADER_LEN); + } + + this.updatePos(false); } private BufferedInputStream readStreamHeader() throws IOException { // We are flexible enough to allow the compressed stream not to // start with the header of BZ. So it works fine either we have - // the header or not. - BufferedInputStream bufferedIn = null; + // the header or not. if (super.in != null) { - bufferedIn = new BufferedInputStream(super.in); bufferedIn.mark(HEADER_LEN); byte[] headerBytes = new byte[HEADER_LEN]; int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); @@ -236,6 +346,17 @@ String header = new String(headerBytes); if (header.compareTo(HEADER) != 0) { bufferedIn.reset(); + } else { + this.isHeaderStripped = true; + // In case of BYBLOCK mode, we also want to strip off + // remaining two character of the header. + if (this.readMode == READ_MODE.BYBLOCK) { + actualRead = bufferedIn.read(headerBytes, 0, + SUB_HEADER_LEN); + if (actualRead != -1) { + this.isSubHeaderStripped = true; + } + } } } } @@ -255,35 +376,98 @@ } } + /** + * This method updates compressed stream position exactly when the + * client of this code has read off at least one byte passed any BZip2 + * end of block marker. + * + * This mechanism is very helpful to deal with data level record + * boundaries. Please see constructor and next methods of + * org.apache.hadoop.mapred.LineRecordReader as an example usage of this + * feature. We elaborate it with an example in the following: + * + * Assume two different scenarios of the BZip2 compressed stream, where + * [m] represent end of block, \n is line delimiter and . represent compressed + * data. + * + * ............[m]......\n....... + * + * ..........\n[m]......\n....... + * + * Assume that end is right after [m]. In the first case the reading + * will stop at \n and there is no need to read one more line. (To see the + * reason of reading one more line in the next() method is explained in LineRecordReader.) + * While in the second example LineRecordReader needs to read one more line + * (till the second \n). Now since BZip2Codecs only update position + * at least one byte passed a maker, so it is straight forward to differentiate + * between the two cases mentioned. + * + */ + public int read(byte[] b, int off, int len) throws IOException { if (needsReset) { internalReset(); } - return this.input.read(b, off, len); + int result = 0; + result = this.input.read(b, off, len); + if (result == BZip2Constants.END_OF_BLOCK) { + this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; + } + + if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { + result = this.input.read(b, off, off + 1); + // This is the precise time to update compressed stream position + // to the client of this code. + this.updatePos(true); + this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; + } + + return result; + } + public int read() throws IOException { + byte b[] = new byte[1]; + int result = this.read(b, 0, 1); + return (result < 0) ? result : b[0]; + } + private void internalReset() throws IOException { if (needsReset) { needsReset = false; BufferedInputStream bufferedIn = readStreamHeader(); - input = new CBZip2InputStream(bufferedIn); + input = new CBZip2InputStream(bufferedIn, this.readMode); } - } - + } + public void resetState() throws IOException { - // Cannot read from bufferedIn at this point because bufferedIn might not be ready + // Cannot read from bufferedIn at this point because bufferedIn + // might not be ready // yet, as in SequenceFile.Reader implementation. needsReset = true; } - public int read() throws IOException { - if (needsReset) { - internalReset(); - } - return this.input.read(); + public long getPos() { + return this.compressedStreamPosition; } + /* + * As the comments before read method tell that + * compressed stream is advertised when at least + * one byte passed EOB have been read off. But + * there is an exception to this rule. When we + * construct the stream we advertise the position + * exactly at EOB. In the following method + * shouldAddOn boolean captures this exception. + * + */ + private void updatePos(boolean shouldAddOn) { + int addOn = shouldAddOn ? 1 : 0; + this.compressedStreamPosition = this.startingPos + + this.input.getProcessedByteCount() + addOn; + } + }// end of BZip2CompressionInputStream } Index: src/mapred/org/apache/hadoop/mapred/TextInputFormat.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (revision 760378) +++ src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (working copy) @@ -20,6 +20,7 @@ import java.io.*; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -36,13 +37,25 @@ implements JobConfigurable { private CompressionCodecFactory compressionCodecs = null; - + private Configuration conf = new Configuration(); + public void configure(JobConf conf) { compressionCodecs = new CompressionCodecFactory(conf); + this.conf=conf; } protected boolean isSplitable(FileSystem fs, Path file) { - return compressionCodecs.getCodec(file) == null; + CompressionCodec codecs = compressionCodecs.getCodec(file); + if(codecs == null){ + return true; + } + else if(!(codecs instanceof SplitEnabledCompressionCodec)){ + return false; + } + else{ + return ((SplitEnabledCompressionCodec)codecs).shouldSplitInput(this.conf); + } + } public RecordReader getRecordReader( Index: src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java =================================================================== --- src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (revision 760378) +++ src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (working copy) @@ -23,9 +23,13 @@ */ package org.apache.hadoop.io.compress.bzip2; +import java.io.BufferedInputStream; import java.io.InputStream; import java.io.IOException; +import org.apache.hadoop.io.compress.SplitEnabledCompressionCodec.READ_MODE; + + /** * An input stream that decompresses from the BZip2 format (without the file * header chars) to be read as any other stream. @@ -45,31 +49,42 @@ *

* *

+ * This Ant code was enhanced so that it can de-compress blocks of bzip2 data. + * Current position in the stream is an important statistic for Hadoop. For + * example in LineRecordReader, we solely depend on the current position in the + * stream to know about the progess. The notion of position becomes complicated + * for compressed files. The Hadoop splitting is done in terms of compressed + * file. But a compressed file deflates to a large amount of data. So we have + * handled this problem in the following way. + * + * On object creation time, we find the next block start delimiter. Once such a + * marker is found, the stream stops there (we discard any read compressed data + * in this process) and the position is updated (i.e. the caller of this class + * will find out the stream location). At this point we are ready for actual + * reading (i.e. decompression) of data. + * + * The subsequent read calls give out data. The position is updated when the + * caller of this class has read off the current block + 1 bytes. In between the + * block reading, position is not updated. (We can only update the postion on + * block boundaries). + *

+ * + *

* Instances of this class are not threadsafe. *

*/ public class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void reportCRCError() throws IOException { - throw new IOException("BZip2 CRC error"); + public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block + public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 stream + private static final int DELIMITER_BIT_LENGTH = 48; + READ_MODE readMode = READ_MODE.CONTINUOUS; + // The variable records the current advertised position of the stream. + private long reportedBytesReadFromCompressedStream = 0L; + // The following variable keep record of compressed bytes read. + private long bytesReadFromCompressedStream = 0L; - } - - private void makeMaps() { - final boolean[] inUse = this.data.inUse; - final byte[] seqToUnseq = this.data.seqToUnseq; - - int nInUseShadow = 0; - - for (int i = 0; i < 256; i++) { - if (inUse[i]) - seqToUnseq[nInUseShadow++] = (byte) i; - } - - this.nInUse = nInUseShadow; - } - /** * Index of the last char in the block, so the block size == last + 1. */ @@ -86,32 +101,30 @@ */ private int blockSize100k; - private boolean blockRandomised; + private boolean blockRandomised = false; - private int bsBuff; - private int bsLive; + private long bsBuff; + private long bsLive; private final CRC crc = new CRC(); private int nInUse; - private InputStream in; + private BufferedInputStream in; private int currentChar = -1; - private static final int EOF = 0; - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; + //A state machine to keep track of current state of the de-coder + public enum STATE { + EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE + }; - private int currentState = START_BLOCK_STATE; + private STATE currentState = STATE.START_BLOCK_STATE; private int storedBlockCRC, storedCombinedCRC; private int computedBlockCRC, computedCombinedCRC; + private boolean skipResult = false;// used by skipToNextMarker + // Variables used by setup* methods exclusively private int su_count; @@ -130,6 +143,104 @@ private CBZip2InputStream.Data data; /** + * This method reports the processed bytes so far. Please note that this + * statistic is only updated on block boundaries and only when the stream is + * initiated in BYBLOCK mode. + */ + public long getProcessedByteCount() { + return reportedBytesReadFromCompressedStream; + } + + public void updateProcessedByteCount(int count) { + this.bytesReadFromCompressedStream += count; + } + + /* + * This method is called by the client of this + * class in case there are any corrections in + * the stream position. One common example is + * when client of this code removes starting BZ + * characters from the compressed stream. * + * + */ + public void updateReportedByteCount(int count) { + this.reportedBytesReadFromCompressedStream += count; + this.updateProcessedByteCount(count); + } + + /** + * This method reads a Byte from the compressed stream. Whenever we need to + * read from the underlying compressed stream, this method should be called + * instead of directly calling the read method of the underlying compressed + * stream. This method does important record keeping to have the statistic + * that how many bytes have been read off the compressed stream. + */ + private int readAByte(InputStream inStream) throws IOException { + int read = inStream.read(); + if (read >= 0) { + this.updateProcessedByteCount(1); + } + return read; + } + + /** + * This method tries to find the end of block (EOB) delimiter in the stream, + * starting from the current position of the stream. If EOB is found, the + * stream position will be right after EOB. + * + * @throws Exception + */ + public boolean skipToNextMarker(long marker, int markerBitLength) + throws Exception { + try { + if (markerBitLength > 63) { + throw new Exception( + "skipToNextMarker can not find patterns greater than 63 bits"); + } + // pick next marketBitLength bits in the stream + long bytes = 0; + bytes = this.bsR(markerBitLength); + if (bytes == -1) { + return false; + } + while (true) { + if (bytes == marker) { + return true; + + } else { + bytes = bytes << 1; + bytes = bytes & ((1L << markerBitLength) - 1); + int oneBit = (int) this.bsR(1); + if (oneBit != -1) { + bytes = bytes | oneBit; + } else + return false; + } + } + } catch (IOException ex) { + return false; + } + } + + private static void reportCRCError() throws IOException { + throw new IOException("crc error"); + } + + private void makeMaps() { + final boolean[] inUse = this.data.inUse; + final byte[] seqToUnseq = this.data.seqToUnseq; + + int nInUseShadow = 0; + + for (int i = 0; i < 256; i++) { + if (inUse[i]) + seqToUnseq[nInUseShadow++] = (byte) i; + } + + this.nInUse = nInUseShadow; + } + + /** * Constructs a new CBZip2InputStream which decompresses bytes read from the * specified stream. * @@ -145,21 +256,77 @@ * @throws NullPointerException * if in == null */ - public CBZip2InputStream(final InputStream in) throws IOException { + public CBZip2InputStream(final InputStream in, READ_MODE readMode) + throws IOException { + super(); + int blockSize = 0X39;// i.e 9 + this.blockSize100k = blockSize - '0'; + this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer + this.readMode = readMode; + if (readMode == READ_MODE.CONTINUOUS) { + currentState = STATE.START_BLOCK_STATE; + init(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + try { + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + changeStateToProcessABlock(); + } catch (Exception e) { + } + } + } - this.in = in; - init(); + public CBZip2InputStream(final InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); } + private void changeStateToProcessABlock() throws IOException { + if (skipResult == true) { + initBlock(); + setupBlock(); + } else { + this.currentState = STATE.EOF; + } + } + + public int read() throws IOException { + if (this.in != null) { - return read0(); + byte array[] = new byte[1]; + int result = this.read(array, 0, 1); + int value = 0X000000FF & array[0]; + return (result > 0 ? value : result); + } else { throw new IOException("stream closed"); } } + /** + * In CONTINOUS reading mode, this read method starts from the + * start of the compressed stream and end at the end of file by + * emitting un-compressed data. In this mode stream positioning + * is not announced and should be ignored. + * + * In BYBLOCK reading mode, this read method informs about the end + * of a BZip2 block by returning EOB. At this event, the compressed + * stream position is also announced. This announcement tells that + * how much of the compressed stream has been de-compressed and read + * out of this class. In between EOB events, the stream position is + * not updated. + * + * + * @throws IOException + * if the stream content is malformed or an I/O error occurs. + * + * @return int The return value greater than 0 are the bytes read. A value + * of -1 means end of stream while -2 represents end of block + */ + + public int read(final byte[] dest, final int offs, final int len) throws IOException { if (offs < 0) { @@ -178,11 +345,31 @@ final int hi = offs + len; int destOffs = offs; - for (int b; (destOffs < hi) && ((b = read0()) >= 0);) { + int b = 0; + + + + for (; ((destOffs < hi) && ((b = read0())) >= 0);) { dest[destOffs++] = (byte) b; + } - return (destOffs == offs) ? -1 : (destOffs - offs); + int result = destOffs - offs; + if (result == 0) { + //report 'end of block' or 'end of stream' + result = b; + + + try { + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + //Exactly when we are about to start a new block, we advertise the stream position. + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + + } catch (Exception e) {} + + changeStateToProcessABlock(); + } + return result; } private int read0() throws IOException { @@ -190,8 +377,11 @@ switch (this.currentState) { case EOF: - return -1; + return END_OF_STREAM;// return -1 + case NO_PROCESS_STATE: + return END_OF_BLOCK;// return -2 + case START_BLOCK_STATE: throw new IllegalStateException(); @@ -225,13 +415,13 @@ } private void init() throws IOException { - int magic2 = this.in.read(); + int magic2 = this.readAByte(in); if (magic2 != 'h') { throw new IOException("Stream is not BZip2 formatted: expected 'h'" + " as first byte but got '" + (char) magic2 + "'"); } - int blockSize = this.in.read(); + int blockSize = this.readAByte(in); if ((blockSize < '1') || (blockSize > '9')) { throw new IOException("Stream is not BZip2 formatted: illegal " + "blocksize " + (char) blockSize); @@ -244,6 +434,27 @@ } private void initBlock() throws IOException { + if (this.readMode == READ_MODE.BYBLOCK) { + // this.checkBlockIntegrity(); + this.storedBlockCRC = bsGetInt(); + this.blockRandomised = bsR(1) == 1; + + /** + * Allocate data here instead in constructor, so we do not allocate + * it if the input file is empty. + */ + if (this.data == null) { + this.data = new Data(this.blockSize100k); + } + + // currBlockNo++; + getAndMoveToFrontDecode(); + + this.crc.initialiseCRC(); + this.currentState = STATE.START_BLOCK_STATE; + return; + } + char magic0 = bsGetUByte(); char magic1 = bsGetUByte(); char magic2 = bsGetUByte(); @@ -261,7 +472,7 @@ magic4 != 0x53 || // 'S' magic5 != 0x59 // 'Y' ) { - this.currentState = EOF; + this.currentState = STATE.EOF; throw new IOException("bad block header"); } else { this.storedBlockCRC = bsGetInt(); @@ -279,7 +490,7 @@ getAndMoveToFrontDecode(); this.crc.initialiseCRC(); - this.currentState = START_BLOCK_STATE; + this.currentState = STATE.START_BLOCK_STATE; } } @@ -295,6 +506,7 @@ this.computedCombinedCRC ^= this.storedBlockCRC; reportCRCError(); + } this.computedCombinedCRC = (this.computedCombinedCRC << 1) @@ -304,7 +516,7 @@ private void complete() throws IOException { this.storedCombinedCRC = bsGetInt(); - this.currentState = EOF; + this.currentState = STATE.EOF; this.data = null; if (this.storedCombinedCRC != this.computedCombinedCRC) { @@ -326,14 +538,14 @@ } } - private int bsR(final int n) throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + private long bsR(final long n) throws IOException { + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < n) { final InputStream inShadow = this.in; do { - int thech = inShadow.read(); + int thech = readAByte(inShadow); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -347,15 +559,16 @@ } this.bsLive = bsLiveShadow - n; - return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1); + final long one = 1; + return (bsBuffShadow >> (bsLiveShadow - n)) & ((one << n) - 1); } private boolean bsGetBit() throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < 1) { - int thech = this.in.read(); + int thech = this.readAByte(in); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -375,7 +588,7 @@ } private int bsGetInt() throws IOException { - return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8); + return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8)); } /** @@ -454,8 +667,8 @@ final int alphaSize = this.nInUse + 2; /* Now the selectors */ - final int nGroups = bsR(3); - final int nSelectors = bsR(15); + final int nGroups = (int) bsR(3); + final int nSelectors = (int) bsR(15); for (int i = 0; i < nSelectors; i++) { int j = 0; @@ -486,7 +699,7 @@ /* Now the coding tables */ for (int t = 0; t < nGroups; t++) { - int curr = bsR(5); + int curr = (int) bsR(5); final char[] len_t = len[t]; for (int i = 0; i < alphaSize; i++) { while (bsGetBit()) { @@ -532,7 +745,7 @@ } private void getAndMoveToFrontDecode() throws IOException { - this.origPtr = bsR(24); + this.origPtr = (int) bsR(24); recvDecodingTables(); final InputStream inShadow = this.in; @@ -562,8 +775,8 @@ int groupPos = G_SIZE - 1; final int eob = this.nInUse + 1; int nextSym = getAndMoveToFrontDecode0(0); - int bsBuffShadow = this.bsBuff; - int bsLiveShadow = this.bsLive; + int bsBuffShadow = (int) this.bsBuff; + int bsLiveShadow = (int) this.bsLive; int lastShadow = -1; int zt = selector[groupNo] & 0xff; int[] base_zt = base[zt]; @@ -597,10 +810,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -609,14 +820,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + long zvec = (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -630,7 +841,7 @@ zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); } - nextSym = perm_zt[zvec - base_zt[zn]]; + nextSym = perm_zt[(int) (zvec - base_zt[zn])]; } final byte ch = seqToUnseq[yy[0]]; @@ -680,10 +891,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -692,14 +901,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + int zvec = (int) (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -709,7 +918,7 @@ } } bsLiveShadow--; - zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); + zvec = (int) ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1)); } nextSym = perm_zt[zvec - base_zt[zn]]; } @@ -726,14 +935,14 @@ final int zt = dataShadow.selector[groupNo] & 0xff; final int[] limit_zt = dataShadow.limit[zt]; int zn = dataShadow.minLens[zt]; - int zvec = bsR(zn); - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + int zvec = (int) bsR(zn); + int bsLiveShadow = (int) this.bsLive; + int bsBuffShadow = (int) this.bsBuff; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; @@ -807,12 +1016,16 @@ this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = RAND_PART_B_STATE; + this.currentState = STATE.RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } @@ -824,19 +1037,23 @@ this.su_tPos = this.data.tt[this.su_tPos]; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = NO_RAND_PART_B_STATE; + this.currentState = STATE.NO_RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { - this.currentState = NO_RAND_PART_A_STATE; + this.currentState = STATE.NO_RAND_PART_A_STATE; endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } private void setupRandPartB() throws IOException { if (this.su_ch2 != this.su_chPrev) { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_count = 1; setupRandPartA(); } else if (++this.su_count >= 4) { @@ -851,13 +1068,13 @@ this.su_rNToGo--; } this.su_j2 = 0; - this.currentState = RAND_PART_C_STATE; + this.currentState = STATE.RAND_PART_C_STATE; if (this.su_rNToGo == 1) { this.su_z ^= 1; } setupRandPartC(); } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; setupRandPartA(); } } @@ -868,7 +1085,7 @@ this.crc.updateCRC(this.su_ch2); this.su_j2++; } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_i2++; this.su_count = 0; setupRandPartA(); @@ -895,7 +1112,7 @@ this.currentChar = su_ch2Shadow; this.crc.updateCRC(su_ch2Shadow); this.su_j2++; - this.currentState = NO_RAND_PART_C_STATE; + this.currentState = STATE.NO_RAND_PART_C_STATE; } else { this.su_i2++; this.su_count = 0; Index: src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java (revision 0) +++ src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java (revision 0) @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Vector; + +import org.apache.hadoop.conf.Configuration; + + +/** + * This interface is meant to be implemented by those compression codecs + * which are capable to compress / de-compress a stream starting at any + * arbitrary position. + * + * Especially the process of de-compressing a stream starting at some arbitrary + * position is challenging. Most of the codecs are only able to successfully + * de-compress a stream, if they start from the very beginning till the end. + * One of the reasons is the stored state at the beginning of the stream which + * is crucial for de-compression. + * + * Yet there are few codecs which do not save the whole state at the beginning + * of the stream and hence can be used to de-compress stream starting at any + * arbitrary points. This interface is meant to be used by such codecs. Such + * codecs are highly valuable, especially in the context of Hadoop, because + * an input compressed file can be split and hence can be worked on by multiple + * machines in parallel. + * @since Hadoop Release 0.19.2 + */ + +public interface SplitEnabledCompressionCodec extends CompressionCodec { + + /** + * During decompression, data can be read off from the decompressor in + * two modes, namely continuous and blocked. Few codecs (e.g. BZip2) are capable of + * compressing data in blocks and then decompressing the blocks. In Blocked reading + * mode codecs inform 'end of block' events to its caller. While in continuous mode, + * the caller of codecs is unaware about the blocks and uncompressed data is spilled + * out like a continuous stream. + */ + public enum READ_MODE {CONTINUOUS, BYBLOCK}; + + + /** + * Create a stream decompressor that will read from the given input stream. The + * mode of reading is dictated by readMode. + * + * @param in the stream to read compressed bytes from + * @param readMode Read mode could be either Continuous or Blocked + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + CompressionInputStream createInputStream(InputStream in, READ_MODE readMode) throws IOException; + + + /** + * This method gives codecs an opportunity to change or shift upper and lower + * ends of a split, if needed by the codecs. The first entry of the vector + * of Integer is the start of split while the second Integer is the end of + * boundary limit. Codecs might change one or both of these limits. It is + * expected that, codecs will return true if either of the two limits are + * changed. This is to signal the caller of this method that eithr of those + * limits have changed. + * @param splitLimits This vector has two entries. Split start and end limit + * @return true if either of the limits are changed + */ + boolean adjustSplitBounds(Vector splitLimits); + + + /** + * Any class implementing this interface is considered capable to handle + * split input. But user can enforce its splitting choice by using a + * job configuration switch. + * + * @param jobConf If "mared.input.codec.nosplitting=true" is found in the + * jobConf then canDecompressSplitInput always return false + * @return true if can de-compress arbitrary split data, false otherwise + */ + boolean shouldSplitInput(Configuration jobConf); + + + +} Property changes on: src/core/org/apache/hadoop/io/compress/SplitEnabledCompressionCodec.java ___________________________________________________________________ Added: svn:executable + *