Index: src/test/org/apache/hadoop/io/compress/TestCodecFactory.java =================================================================== --- src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (revision 720573) +++ src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (working copy) @@ -81,6 +81,19 @@ public String getDefaultExtension() { return ".base"; } + + public boolean canDecompressSplitInput() { + return false; + } + + public CompressionInputStream createInputStream(InputStream in, + READ_MODE readMode) throws IOException { + return null; + } + + public int seekBackwards() { + return 0; + } } private static class BarCodec extends BaseCodec { Index: src/test/org/apache/hadoop/io/compress/TestCodec.java =================================================================== --- src/test/org/apache/hadoop/io/compress/TestCodec.java (revision 720573) +++ src/test/org/apache/hadoop/io/compress/TestCodec.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.io.RandomDatum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.CompressionCodec.READ_MODE; public class TestCodec extends TestCase { @@ -116,8 +117,15 @@ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); - CompressionInputStream inflateFilter = - codec.createInputStream(deCompressedDataBuffer); + CompressionInputStream inflateFilter = null; + if(codecClass.compareTo("org.apache.hadoop.io.compress.BZip2Codec")==0){ + inflateFilter = + codec.createInputStream(deCompressedDataBuffer, READ_MODE.BYBLOCK); + } + else{ + inflateFilter = + codec.createInputStream(deCompressedDataBuffer); + } DataInputStream inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); Index: src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java =================================================================== --- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (revision 720573) +++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (working copy) @@ -90,16 +90,16 @@ file.writeBytes(mapString2 + "\n"); file.close(); file = fileSys.create(new Path(CACHE_FILE)); - file.writeBytes(cacheString); + file.writeBytes(cacheString + "\n"); file.close(); file = fileSys.create(new Path(CACHE_FILE_2)); - file.writeBytes(cacheString2); + file.writeBytes(cacheString2 + "\n"); file.close(); job = new StreamJob(argv, mayExit); job.go(); - fileSys = dfs.getFileSystem(); + fileSys = dfs.getFileSystem(); String line = null; String line2 = null; Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus( Index: src/core/org/apache/hadoop/io/compress/CompressionCodec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/CompressionCodec.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/CompressionCodec.java (working copy) @@ -26,7 +26,18 @@ * This class encapsulates a streaming compression/decompression pair. */ public interface CompressionCodec { + + /** + * During decompression, data can be read off from the decompressor in + * two modes, namely continuous and blocked. Few codecs (e.g. BZip2) are capable of + * compressing data in blocks and then decompressing the blocks. In Blocked reading + * mode codecs inform 'end of block' events to its caller. While in continuous mode, + * the caller of codecs is unaware about the blocks and uncompressed data is spill + * out like a continuous stream. + */ + public enum READ_MODE {CONTINUOUS, BYBLOCK}; + /** * Create a {@link CompressionOutputStream} that will write to the given * {@link OutputStream}. @@ -86,8 +97,20 @@ CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException; + + /** + * Create a stream decompressor that will read from the given input stream. The + * mode of reading is dictated by readMode. + * + * @param in the stream to read compressed bytes from + * @param readMode Read mode could be either Continuous or Blocked + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + CompressionInputStream createInputStream(InputStream in, READ_MODE readMode) throws IOException; + /** * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. * @@ -107,4 +130,20 @@ * @return the extension including the '.' */ String getDefaultExtension(); + + /** + * If a codec class is capable to decompress data which starts and ends + * at arbitrary locations (which is determined by Hadoop using InputFormat + * provided by the user), this method returns true. In such a case the + * compressed file is considered splitable by Hadoop. + * @return true if can de-compress arbitrary split data, false otherwise + */ + boolean canDecompressSplitInput(); + + /** + * If a codec class wants to seek the stream backwards, it should provide + * the number of bytes to seek here in this method. + * @return the number of bytes to seek backwards + */ + int seekBackwards(); } Index: src/core/org/apache/hadoop/io/compress/LzoCodec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/LzoCodec.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/LzoCodec.java (working copy) @@ -163,7 +163,13 @@ return new BlockDecompressorStream(in, decompressor, conf.getInt("io.compression.codec.lzo.buffersize", 64*1024)); } + + public CompressionInputStream createInputStream(InputStream in, + READ_MODE readMode) throws IOException { + throw new UnsupportedOperationException(); + } + public Class getDecompressorType() { // Ensure native-lzo library is loaded & initialized if (!isNativeLzoLoaded(conf)) { @@ -195,4 +201,13 @@ public String getDefaultExtension() { return ".lzo_deflate"; } + + public boolean canDecompressSplitInput() { + return false; + } + + public int seekBackwards() { + return 0; + } + } Index: src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java =================================================================== --- src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java (working copy) @@ -44,6 +44,8 @@ int N_ITERS = 4; int MAX_SELECTORS = (2 + (900000 / G_SIZE)); int NUM_OVERSHOOT_BYTES = 20; + public static final int END_OF_BLOCK = -2; + public static final int END_OF_STREAM = -1; /** * This array really shouldn't be here. Again, for historical purposes it Index: src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java =================================================================== --- src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (working copy) @@ -23,13 +23,18 @@ */ package org.apache.hadoop.io.compress.bzip2; +import java.io.BufferedInputStream; import java.io.InputStream; import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.io.compress.CompressionCodec.READ_MODE; +import org.apache.hadoop.io.compress.CompressionInputStream; + /** * An input stream that decompresses from the BZip2 format (without the file * header chars) to be read as any other stream. - * + * *

* The decompression requires large amounts of memory. Thus you should call the * {@link #close() close()} method as soon as possible, to force @@ -37,41 +42,52 @@ * {@link CBZip2OutputStream CBZip2OutputStream} for information about memory * usage. *

- * + * *

* CBZip2InputStream reads bytes from the compressed source stream via * the single byte {@link java.io.InputStream#read() read()} method exclusively. * Thus you should consider to use a buffered source stream. *

- * + * *

+ * This Ant code was enhanced so that it can de-compress blocks of bzip2 data. + * Current position in the stream is an important statistic for Hadoop. For + * example in LineRecordReader, we solely depend on the current position in the + * stream to know about the progess. The notion of position becomes complicated + * for compressed files. The Hadoop splitting is done in terms of compressed + * file. But a compressed file deflates to a large amount of data. So we have + * handled this problem in the following way. + * + * On object creation time, we find the next block start delimiter. Once such a + * marker is found, the stream stops there (we discard any read compressed data + * in this process) and the position is updated (i.e. the caller of this class + * will find out the stream location). At this point we are ready for actual + * reading (i.e. decompression) of data. + * + * The subsequent read calls give out data. The position is updated when the + * caller of this class has read off the current block + 1 bytes. In between the + * block reading, position is not updated. (We can only update the postion on + * block boundaries). + *

+ * + *

* Instances of this class are not threadsafe. *

*/ public class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void reportCRCError() throws IOException { - // The clean way would be to throw an exception. - // throw new IOException("crc error"); - - // Just print a message, like the previous versions of this class did - System.err.println("BZip2 CRC error"); - } - - private void makeMaps() { - final boolean[] inUse = this.data.inUse; - final byte[] seqToUnseq = this.data.seqToUnseq; - - int nInUseShadow = 0; - - for (int i = 0; i < 256; i++) { - if (inUse[i]) - seqToUnseq[nInUseShadow++] = (byte) i; - } - - this.nInUse = nInUseShadow; - } - + public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block + public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 + // stream + private static final int DELIMITER_BIT_LENGTH = 48; + READ_MODE readMode = READ_MODE.CONTINUOUS; + // The variable records the current advertised position of the stream. + private long reportedBytesReadFromCompressedStream = 0L; + // The following variable keep record of read compressed bytes. + private long bytesReadFromCompressedStream = 0L; + private boolean isBlockFinished = false; + private ArrayList uncompressedData = new ArrayList( + 1024 * 2); /** * Index of the last char in the block, so the block size == last + 1. */ @@ -88,32 +104,29 @@ */ private int blockSize100k; - private boolean blockRandomised; + private boolean blockRandomised = false; - private int bsBuff; - private int bsLive; + private long bsBuff; + private long bsLive; private final CRC crc = new CRC(); private int nInUse; - private InputStream in; + private BufferedInputStream in; private int currentChar = -1; - private static final int EOF = 0; - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; + public enum STATE { + EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE + }; - private int currentState = START_BLOCK_STATE; + private STATE currentState = STATE.START_BLOCK_STATE; private int storedBlockCRC, storedCombinedCRC; private int computedBlockCRC, computedCombinedCRC; + private boolean result = false;// used by skipToNextMarker + // Variables used by setup* methods exclusively private int su_count; @@ -132,29 +145,152 @@ private CBZip2InputStream.Data data; /** + * This method reports the processed bytes so far. Please note that this + * statistic is only updated on block boundaries and only when the stream is + * initiated in BYBLOCK mode. + */ + public long getProcessedByteCount() { + return reportedBytesReadFromCompressedStream; + } + + public void updateProcessedByteCount(int count) { + this.bytesReadFromCompressedStream += count; + } + + public void updateReportedByteCount(int count) { + this.reportedBytesReadFromCompressedStream += count; + this.updateProcessedByteCount(count); + } + + /** + * This method reads a Byte from the compressed stream. Whenever we need to + * read from the underlying compressed stream, this method should be called + * instead of directly calling the read method of the underlying compressed + * stream. This method does important record keeping to have the statistic + * that how many bytes have been read off the compressed stream. + */ + private int readAByte(InputStream inStream) throws IOException { + int read = inStream.read(); + if (read >= 0) { + this.updateProcessedByteCount(1); + } + + return read; + + } + + /** + * This method tries to find the end of block (EOB) delimiter in the stream, + * starting from the current position of the stream. If EOB is found, the + * stream position will be right after EOB. + * + * @throws Exception + */ + public boolean skipToNextMarker(long marker, int markerBitLength) + throws Exception { + try { + if (markerBitLength > 63) { + throw new Exception( + "skitToNextMarket can not find patterns greater than 63 bits"); + } + // pick next marketBitLength bits in the stream + long bytes = 0; + bytes = this.bsR(markerBitLength); + if (bytes == -1) { + return false; + } + while (true) { + if (bytes == marker) { + return true; + + } else { + bytes = bytes << 1; + bytes = bytes & ((1L << markerBitLength) - 1); + int oneBit = (int) this.bsR(1); + if (oneBit != -1) { + bytes = bytes | oneBit; + } else + return false; + } + } + } catch (IOException ex) { + return false; + } + } + + private static void reportCRCError() throws IOException { + throw new IOException("crc error"); + } + + private void makeMaps() { + final boolean[] inUse = this.data.inUse; + final byte[] seqToUnseq = this.data.seqToUnseq; + + int nInUseShadow = 0; + + for (int i = 0; i < 256; i++) { + if (inUse[i]) + seqToUnseq[nInUseShadow++] = (byte) i; + } + + this.nInUse = nInUseShadow; + } + + /** * Constructs a new CBZip2InputStream which decompresses bytes read from the * specified stream. - * + * *

* Although BZip2 headers are marked with the magic "Bz" this * constructor expects the next byte in the stream to be the first one after * the magic. Thus callers have to skip the first two bytes. Otherwise this * constructor will throw an exception. *

- * + * * @throws IOException * if the stream content is malformed or an I/O error occurs. * @throws NullPointerException * if in == null */ - public CBZip2InputStream(final InputStream in) throws IOException { + public CBZip2InputStream(final InputStream in, READ_MODE readMode) + throws IOException { + super(); + int blockSize = 0X39;// i.e 9 + this.blockSize100k = blockSize - '0'; + this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer + this.readMode = readMode; + if (readMode == READ_MODE.CONTINUOUS) { + currentState = STATE.START_BLOCK_STATE; + init(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + try { + result = this + .skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, + DELIMITER_BIT_LENGTH); + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + changeStateToProcessABlock(); + } catch (Exception e) { + } + } + } - this.in = in; - init(); + public CBZip2InputStream(final InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); } + private void changeStateToProcessABlock() throws IOException { + if (result == true) { + initBlock(); + setupBlock(); + } else { + this.currentState = STATE.EOF; + } + } + public int read() throws IOException { + if (this.in != null) { return read0(); } else { @@ -162,6 +298,10 @@ } } + private boolean reportedEOB = false; + private boolean shouldAdvertiseStreamPostion = false; + private long bytesReadFromCompressedStream2 = 0; + public int read(final byte[] dest, final int offs, final int len) throws IOException { if (offs < 0) { @@ -178,13 +318,49 @@ throw new IOException("stream closed"); } + if (shouldAdvertiseStreamPostion == true) { + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream2 + 1; + shouldAdvertiseStreamPostion = false; + + } + final int hi = offs + len; int destOffs = offs; - for (int b; (destOffs < hi) && ((b = read0()) >= 0);) { + int b = 0; + + // if EOB event is already reported in previous read call, now its time + // to move on. + if (reportedEOB == true) { + reportedEOB = false; + try { + result = this + .skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, + DELIMITER_BIT_LENGTH); + bytesReadFromCompressedStream2 = this.bytesReadFromCompressedStream; + } catch (Exception e) { + } + + changeStateToProcessABlock(); + shouldAdvertiseStreamPostion = true; + } + + for (; ((destOffs < hi) && ((b = read0())) >= 0);) { dest[destOffs++] = (byte) b; + // Following is a hack to ensure that, this class can advertise the + // stream + // position exactly at a block boundary + 1 Byte (in terms of + // compressed data) + if (shouldAdvertiseStreamPostion == true) { + break; + } } - return (destOffs == offs) ? -1 : (destOffs - offs); + int result = destOffs - offs; + if (result == 0) { + result = b; + reportedEOB = true; + } + return result; } private int read0() throws IOException { @@ -192,8 +368,11 @@ switch (this.currentState) { case EOF: - return -1; + return END_OF_STREAM;// return -1 + case NO_PROCESS_STATE: + return END_OF_BLOCK;// return -2 + case START_BLOCK_STATE: throw new IllegalStateException(); @@ -227,13 +406,13 @@ } private void init() throws IOException { - int magic2 = this.in.read(); + int magic2 = this.readAByte(in); if (magic2 != 'h') { throw new IOException("Stream is not BZip2 formatted: expected 'h'" + " as first byte but got '" + (char) magic2 + "'"); } - int blockSize = this.in.read(); + int blockSize = this.readAByte(in); if ((blockSize < '1') || (blockSize > '9')) { throw new IOException("Stream is not BZip2 formatted: illegal " + "blocksize " + (char) blockSize); @@ -246,6 +425,27 @@ } private void initBlock() throws IOException { + if (this.readMode == READ_MODE.BYBLOCK) { + // this.checkBlockIntegrity(); + this.storedBlockCRC = bsGetInt(); + this.blockRandomised = bsR(1) == 1; + + /** + * Allocate data here instead in constructor, so we do not allocate + * it if the input file is empty. + */ + if (this.data == null) { + this.data = new Data(this.blockSize100k); + } + + // currBlockNo++; + getAndMoveToFrontDecode(); + + this.crc.initialiseCRC(); + this.currentState = STATE.START_BLOCK_STATE; + return; + } + char magic0 = bsGetUByte(); char magic1 = bsGetUByte(); char magic2 = bsGetUByte(); @@ -263,7 +463,7 @@ magic4 != 0x53 || // 'S' magic5 != 0x59 // 'Y' ) { - this.currentState = EOF; + this.currentState = STATE.EOF; throw new IOException("bad block header"); } else { this.storedBlockCRC = bsGetInt(); @@ -281,11 +481,12 @@ getAndMoveToFrontDecode(); this.crc.initialiseCRC(); - this.currentState = START_BLOCK_STATE; + this.currentState = STATE.START_BLOCK_STATE; } } private void endBlock() throws IOException { + isBlockFinished = true; this.computedBlockCRC = this.crc.getFinalCRC(); // A bad CRC is considered a fatal error. @@ -297,6 +498,7 @@ this.computedCombinedCRC ^= this.storedBlockCRC; reportCRCError(); + } this.computedCombinedCRC = (this.computedCombinedCRC << 1) @@ -306,7 +508,7 @@ private void complete() throws IOException { this.storedCombinedCRC = bsGetInt(); - this.currentState = EOF; + this.currentState = STATE.EOF; this.data = null; if (this.storedCombinedCRC != this.computedCombinedCRC) { @@ -328,14 +530,14 @@ } } - private int bsR(final int n) throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + private long bsR(final long n) throws IOException { + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < n) { final InputStream inShadow = this.in; do { - int thech = inShadow.read(); + int thech = readAByte(inShadow); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -349,15 +551,16 @@ } this.bsLive = bsLiveShadow - n; - return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1); + final long one = 1; + return (bsBuffShadow >> (bsLiveShadow - n)) & ((one << n) - 1); } private boolean bsGetBit() throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < 1) { - int thech = this.in.read(); + int thech = this.readAByte(in); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -377,7 +580,7 @@ } private int bsGetInt() throws IOException { - return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8); + return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8)); } /** @@ -456,8 +659,8 @@ final int alphaSize = this.nInUse + 2; /* Now the selectors */ - final int nGroups = bsR(3); - final int nSelectors = bsR(15); + final int nGroups = (int) bsR(3); + final int nSelectors = (int) bsR(15); for (int i = 0; i < nSelectors; i++) { int j = 0; @@ -488,7 +691,7 @@ /* Now the coding tables */ for (int t = 0; t < nGroups; t++) { - int curr = bsR(5); + int curr = (int) bsR(5); final char[] len_t = len[t]; for (int i = 0; i < alphaSize; i++) { while (bsGetBit()) { @@ -534,7 +737,7 @@ } private void getAndMoveToFrontDecode() throws IOException { - this.origPtr = bsR(24); + this.origPtr = (int) bsR(24); recvDecodingTables(); final InputStream inShadow = this.in; @@ -564,8 +767,8 @@ int groupPos = G_SIZE - 1; final int eob = this.nInUse + 1; int nextSym = getAndMoveToFrontDecode0(0); - int bsBuffShadow = this.bsBuff; - int bsLiveShadow = this.bsLive; + int bsBuffShadow = (int) this.bsBuff; + int bsLiveShadow = (int) this.bsLive; int lastShadow = -1; int zt = selector[groupNo] & 0xff; int[] base_zt = base[zt]; @@ -599,10 +802,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -611,14 +812,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + long zvec = (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -632,7 +833,7 @@ zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); } - nextSym = perm_zt[zvec - base_zt[zn]]; + nextSym = perm_zt[(int) (zvec - base_zt[zn])]; } final byte ch = seqToUnseq[yy[0]]; @@ -682,10 +883,8 @@ int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -694,14 +893,14 @@ throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + int zvec = (int) (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -711,7 +910,7 @@ } } bsLiveShadow--; - zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); + zvec = (int) ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1)); } nextSym = perm_zt[zvec - base_zt[zn]]; } @@ -728,14 +927,14 @@ final int zt = dataShadow.selector[groupNo] & 0xff; final int[] limit_zt = dataShadow.limit[zt]; int zn = dataShadow.minLens[zt]; - int zvec = bsR(zn); - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + int zvec = (int) bsR(zn); + int bsLiveShadow = (int) this.bsLive; + int bsBuffShadow = (int) this.bsBuff; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; @@ -809,12 +1008,16 @@ this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = RAND_PART_B_STATE; + this.currentState = STATE.RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } @@ -826,19 +1029,23 @@ this.su_tPos = this.data.tt[this.su_tPos]; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = NO_RAND_PART_B_STATE; + this.currentState = STATE.NO_RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { - this.currentState = NO_RAND_PART_A_STATE; + this.currentState = STATE.NO_RAND_PART_A_STATE; endBlock(); - initBlock(); - setupBlock(); + if (readMode == READ_MODE.CONTINUOUS) { + initBlock(); + setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } private void setupRandPartB() throws IOException { if (this.su_ch2 != this.su_chPrev) { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_count = 1; setupRandPartA(); } else if (++this.su_count >= 4) { @@ -853,13 +1060,13 @@ this.su_rNToGo--; } this.su_j2 = 0; - this.currentState = RAND_PART_C_STATE; + this.currentState = STATE.RAND_PART_C_STATE; if (this.su_rNToGo == 1) { this.su_z ^= 1; } setupRandPartC(); } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; setupRandPartA(); } } @@ -870,7 +1077,7 @@ this.crc.updateCRC(this.su_ch2); this.su_j2++; } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_i2++; this.su_count = 0; setupRandPartA(); @@ -897,7 +1104,7 @@ this.currentChar = su_ch2Shadow; this.crc.updateCRC(su_ch2Shadow); this.su_j2++; - this.currentState = NO_RAND_PART_C_STATE; + this.currentState = STATE.NO_RAND_PART_C_STATE; } else { this.su_i2++; this.su_count = 0; @@ -928,7 +1135,7 @@ final int[] cftab = new int[257]; // 1028 byte final char[] getAndMoveToFrontDecode_yy = new char[256]; // 512 byte final char[][] temp_charArray2d = new char[N_GROUPS][MAX_ALPHA_SIZE]; // 3096 - // byte + // byte final byte[] recvDecodingTables_pos = new byte[N_GROUPS]; // 6 byte // --------------- // 60798 byte @@ -948,7 +1155,7 @@ /** * Initializes the {@link #tt} array. - * + * * This method is called when the required length of the array is known. * I don't initialize it at construction time to avoid unneccessary * memory allocation when compressing small files. @@ -967,5 +1174,66 @@ return ttShadow; } + private Data makeClone() { + + Data other = new Data(9); + + for (int i = 0; i < 256; i++) { + other.inUse[i] = this.inUse[i]; + } + for (int i = 0; i < 256; i++) { + other.seqToUnseq[i] = this.seqToUnseq[i]; + } + for (int i = 0; i < MAX_SELECTORS; i++) { + other.selector[i] = this.selector[i]; + } + for (int i = 0; i < MAX_SELECTORS; i++) { + other.selectorMtf[i] = this.selectorMtf[i]; + } + for (int i = 0; i < 256; i++) { + other.unzftab[i] = this.unzftab[i]; + } + for (int i = 0; i < N_GROUPS; i++) { + for (int j = 0; j < MAX_ALPHA_SIZE; j++) { + other.limit[i][j] = this.limit[i][j]; + } + + } + for (int i = 0; i < N_GROUPS; i++) { + for (int j = 0; j < MAX_ALPHA_SIZE; j++) { + other.base[i][j] = this.base[i][j]; + } + } + for (int i = 0; i < N_GROUPS; i++) { + for (int j = 0; j < MAX_ALPHA_SIZE; j++) { + other.perm[i][j] = this.perm[i][j]; + } + } + for (int i = 0; i < N_GROUPS; i++) { + other.minLens[i] = this.minLens[i]; + } + for (int i = 0; i < 257; i++) { + other.cftab[i] = this.cftab[i]; + } + for (int i = 0; i < 256; i++) { + other.getAndMoveToFrontDecode_yy[i] = this.getAndMoveToFrontDecode_yy[i]; + } + for (int i = 0; i < N_GROUPS; i++) { + for (int j = 0; j < MAX_ALPHA_SIZE; j++) { + other.temp_charArray2d[i][j] = this.temp_charArray2d[i][j]; + } + } + for (int i = 0; i < N_GROUPS; i++) { + other.recvDecodingTables_pos[i] = this.recvDecodingTables_pos[i]; + } + for (int i = 0; i < this.tt.length; i++) { + other.tt[i] = this.tt[i]; + } + for (int i = 0; i < this.ll8.length; i++) { + other.ll8[i] = this.ll8[i]; + } + return other; + } + } } Index: src/core/org/apache/hadoop/io/compress/DefaultCodec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/DefaultCodec.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/DefaultCodec.java (working copy) @@ -38,16 +38,16 @@ return conf; } - public CompressionOutputStream createOutputStream(OutputStream out) + public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return new CompressorStream(out, createCompressor(), + return new CompressorStream(out, createCompressor(), conf.getInt("io.file.buffer.size", 4*1024)); } - public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { - return new CompressorStream(out, compressor, + return new CompressorStream(out, compressor, conf.getInt("io.file.buffer.size", 4*1024)); } @@ -59,16 +59,21 @@ return ZlibFactory.getZlibCompressor(conf); } - public CompressionInputStream createInputStream(InputStream in) + public CompressionInputStream createInputStream(InputStream in) throws IOException { return new DecompressorStream(in, createDecompressor(), conf.getInt("io.file.buffer.size", 4*1024)); } - public CompressionInputStream createInputStream(InputStream in, - Decompressor decompressor) + public CompressionInputStream createInputStream(InputStream in, + READ_MODE readMode) throws IOException { + throw new UnsupportedOperationException(); + } + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { - return new DecompressorStream(in, decompressor, + return new DecompressorStream(in, decompressor, conf.getInt("io.file.buffer.size", 4*1024)); } @@ -84,4 +89,13 @@ return ".deflate"; } + + public boolean canDecompressSplitInput() { + return false; + } + + public int seekBackwards() { + return 0; + } + } Index: src/core/org/apache/hadoop/io/compress/BZip2Codec.java =================================================================== --- src/core/org/apache/hadoop/io/compress/BZip2Codec.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/BZip2Codec.java (working copy) @@ -23,8 +23,10 @@ import java.io.InputStream; import java.io.OutputStream; +import org.apache.hadoop.io.compress.bzip2.BZip2Constants; import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; +import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.STATE; /** * This class provides CompressionOutputStream and CompressionInputStream for @@ -38,6 +40,9 @@ private static final String HEADER = "BZ"; private static final int HEADER_LEN = HEADER.length(); + private static final String SUB_HEADER = "h9"; + private static final int SUB_HEADER_LEN = SUB_HEADER.length(); + private static final int MAX_BZIP2_BLOCK_SPAN = 9; /** * Creates a new instance of BZip2Codec @@ -117,6 +122,26 @@ } /** + * Creates CompressionInputStream to be used to read off uncompressed data in one of + * the two reading modes. i.e. Continuous or Blocked reading modes + * + * @param in + * The InputStream + * @param readMode + * The reading mode + * + * @return Returns CompressionInputStream for BZip2 + * @throws java.io.IOException + * Throws IOException + */ + public CompressionInputStream createInputStream(InputStream in, + READ_MODE readMode) throws IOException { + CompressionInputStream compressionInputStream = null; + compressionInputStream = new BZip2CompressionInputStream(in, readMode); + return compressionInputStream; + } + + /** * This functionality is currently not supported. * * @throws java.lang.UnsupportedOperationException @@ -145,6 +170,38 @@ return ".bz2"; } + /** + * bzip2 can work with Hadoop generated file splits. For that + * bzip2 codecs must be used in Blocked mode while creating + * CompressionInputStream + * + * @return boolean true + */ + public boolean canDecompressSplitInput() { + return true; + } + + /** + * To handle a case when Hadoop split is exactly on BZip2 + * block, we need to skip back. A BZip2 block start delimiter + * can span up to 7 bytes (because BZip2 is bit oriented stream). + * But at the very start of a BZip2 stream we have "BZh9" so there + * are 10 bytes before the start of data. Since caller of this + * class might do special handling for the first block (like + * LineRecordReader does by not reading a line if it is the first + * block), so BZip2 skip back 9 bytes to facilitate this special + * handling. + * + * @return int No of bytes to be skiped back + */ + public int seekBackwards() { + // A BZip2 block delimiter can be spread in 7 bytes. + // 1 bit, 8, 8, 8, 8, 8, 7 bits. It is necessary to seek backwards + // to handle a case when Hadoop split is exactly on a BZip2 block + // marker. + return this.MAX_BZIP2_BLOCK_SPAN; + } + private static class BZip2CompressionOutputStream extends CompressionOutputStream { // class data starts here// @@ -202,23 +259,48 @@ // class data starts here// private CBZip2InputStream input; + private BufferedInputStream bufferedIn; + private boolean isHeaderStripped = false; + private boolean isSubHeaderStripped = false; + private READ_MODE readMode = READ_MODE.CONTINUOUS; + private long startingPos = 0L; // class data ends here// public BZip2CompressionInputStream(InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); + } + public BZip2CompressionInputStream(InputStream in, READ_MODE readMode) + throws IOException { super(in); - BufferedInputStream bufferedIn = readStreamHeader(); - input = new CBZip2InputStream(bufferedIn); + bufferedIn = new BufferedInputStream(super.in); + this.startingPos = super.getPos(); + this.readMode = readMode; + if(this.startingPos == 0){ + //We only strip header if it is start of file + bufferedIn = readStreamHeader(); + } + input = new CBZip2InputStream(bufferedIn, readMode); + if (this.isHeaderStripped) { + input.updateReportedByteCount(HEADER_LEN); + } + + if (this.isSubHeaderStripped) { + input.updateReportedByteCount(SUB_HEADER_LEN); + } } + + + private BufferedInputStream readStreamHeader() throws IOException { // We are flexible enough to allow the compressed stream not to // start with the header of BZ. So it works fine either we have // the header or not. - BufferedInputStream bufferedIn = null; + //BufferedInputStream bufferedIn = null; if (super.in != null) { - bufferedIn = new BufferedInputStream(super.in); + //bufferedIn = new BufferedInputStream(super.in); bufferedIn.mark(HEADER_LEN); byte[] headerBytes = new byte[HEADER_LEN]; int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); @@ -226,9 +308,20 @@ String header = new String(headerBytes); if (header.compareTo(HEADER) != 0) { bufferedIn.reset(); - } + } else { + this.isHeaderStripped = true; + //In case of BYBLOCK mode, we also want to strip off + //remaining two character of the header. + if (this.readMode == READ_MODE.BYBLOCK) { + actualRead = bufferedIn.read(headerBytes, 0, + SUB_HEADER_LEN); + if (actualRead != -1) { + this.isSubHeaderStripped = true; + } + } + }//end of else } - } + }//end of outer most if if (bufferedIn == null) { throw new IOException("Failed to read bzip2 stream."); @@ -244,8 +337,16 @@ public int read(byte[] b, int off, int len) throws IOException { - return this.input.read(b, off, len); + + int result = 0; + do{ + result = this.input.read(b, off, len); + } + while(result == BZip2Constants.END_OF_BLOCK); + + return result; + } public void resetState() throws IOException { @@ -253,7 +354,13 @@ } public int read() throws IOException { - return this.input.read(); + int result = 0; + do{ + result = this.input.read(); + } + while(result == BZip2Constants.END_OF_BLOCK); + + return result; } @@ -263,7 +370,9 @@ } } - - }// end of BZip2CompressionInputStream - + + public long getPos(){ + return this.startingPos + this.input.getProcessedByteCount(); + } + }// end of BZip2CompressionInputStream } Index: src/core/org/apache/hadoop/io/compress/CompressionInputStream.java =================================================================== --- src/core/org/apache/hadoop/io/compress/CompressionInputStream.java (revision 720573) +++ src/core/org/apache/hadoop/io/compress/CompressionInputStream.java (working copy) @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + /** * A compression input stream. * @@ -28,11 +31,12 @@ * reposition the underlying input stream then call {@link #resetState()}, * without having to also synchronize client buffers. */ -public abstract class CompressionInputStream extends InputStream { +public abstract class CompressionInputStream extends InputStream implements Seekable { /** * The input stream to be compressed. */ protected final InputStream in; + protected long maxAvailableData = 0L; /** * Create a compression input stream that reads @@ -41,6 +45,12 @@ * @param in The input stream to be compressed. */ protected CompressionInputStream(InputStream in) { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + try { + this.maxAvailableData = in.available(); + } catch (IOException e) { + } + } this.in = in; } @@ -59,5 +69,25 @@ * as the underlying stream may have been repositioned. */ public abstract void resetState() throws IOException; - + + public long getPos() throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){ + //This way of getting the current position will not work for file + //size which can be fit in an int and hence can not be returned by + //available method. + return (this.maxAvailableData - this.in.available()); + } + else{ + return ((Seekable)this.in).getPos(); + } + + } + + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException(); + } + + public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException(); + } } Index: src/core/org/apache/hadoop/fs/FSInputChecker.java =================================================================== --- src/core/org/apache/hadoop/fs/FSInputChecker.java (revision 720573) +++ src/core/org/apache/hadoop/fs/FSInputChecker.java (working copy) @@ -295,12 +295,12 @@ @Override public synchronized long getPos() throws IOException { - return chunkPos-(count-pos); + return chunkPos-((count-pos)<0?0:count-pos); } @Override public synchronized int available() throws IOException { - return count-pos; + return (count-pos)<0?0:count-pos; } /** Index: src/core/org/apache/hadoop/util/LineReader.java =================================================================== --- src/core/org/apache/hadoop/util/LineReader.java (revision 720573) +++ src/core/org/apache/hadoop/util/LineReader.java (working copy) @@ -186,5 +186,9 @@ public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } + + public long getBufferedDataLength(){ + return (bufferPosn >= bufferLength? 0: bufferLength - bufferPosn); + } } Index: src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (revision 720573) +++ src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (working copy) @@ -101,7 +101,7 @@ return true; } - public float getProgress() { + public float getProgress() throws IOException { return lineRecordReader.getProgress(); } Index: src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java (revision 720573) +++ src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java (working copy) @@ -97,7 +97,15 @@ numLines++; length += num; if (numLines == N) { - splits.add(new FileSplit(fileName, begin, length, new String[]{})); + // NLineInputFormat uses LineRecordReader, which always reads (and consumes) + //at least one character out of its upper split boundary. So to make sure that + //each mapper gets N lines, we move back the upper split limits of each split + //by one character here. + if (begin == 0) { + splits.add(new FileSplit(fileName, begin, length - 1, new String[] {})); + } else { + splits.add(new FileSplit(fileName, begin - 1, length, new String[] {})); + } begin += length; length = 0; numLines = 0; Index: src/mapred/org/apache/hadoop/mapred/LineRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (revision 720573) +++ src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; @@ -44,8 +45,10 @@ private long start; private long pos; private long end; - private LineReader in; + private LineReader lineReader; int maxLineLength; + private InputStream input = null; + private FSDataInputStream fileIn; /** * A class that provides a line reader from an input stream. @@ -75,31 +78,52 @@ final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split - FileSystem fs = file.getFileSystem(job); - FSDataInputStream fileIn = fs.open(split.getPath()); - boolean skipFirstLine = false; + FileSystem fs = file.getFileSystem(job); + fileIn = fs.open(split.getPath()); + fileIn.seek(start); + if (codec != null) { - in = new LineReader(codec.createInputStream(fileIn), job); - end = Long.MAX_VALUE; - } else { - if (start != 0) { - skipFirstLine = true; - --start; + if(codec.canDecompressSplitInput()){ + long oldStart = start; + start = (start - codec.seekBackwards()) < 0? 0 : start-codec.seekBackwards(); fileIn.seek(start); + this.input = codec.createInputStream(fileIn, CompressionCodec.READ_MODE.BYBLOCK); + if(this.getPos() <= oldStart){ + start = oldStart; + fileIn.seek(start); + this.input = codec.createInputStream(fileIn, CompressionCodec.READ_MODE.BYBLOCK); + } + lineReader = new LineReader(this.input); } - in = new LineReader(fileIn, job); + else{ + this.input = codec.createInputStream(fileIn); + lineReader = new LineReader(this.input, job); + end = Long.MAX_VALUE; + } + + } else {//for plain Text + input = new org.apache.hadoop.fs.FSDataInputStream(fileIn) { + + public long getPos() throws IOException{ + return ((Seekable)this.in).getPos() - lineReader.getBufferedDataLength(); + } + + }; + + lineReader = new LineReader(this.input, job); } - if (skipFirstLine) { // skip first line and re-establish "start". - start += in.readLine(new Text(), 0, - (int)Math.min((long)Integer.MAX_VALUE, end - start)); + // If this is not the first split, we always throw away first record + // because we always (except the last split) read one extra line in + // next() method. + if (start != 0) { + lineReader.readLine(new Text(), maxLineLength, Integer.MAX_VALUE); } - this.pos = start; } public LineRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; - this.in = new LineReader(in); + this.lineReader = new LineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; @@ -110,7 +134,7 @@ throws IOException{ this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); - this.in = new LineReader(in, job); + this.lineReader = new LineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; @@ -128,16 +152,17 @@ public synchronized boolean next(LongWritable key, Text value) throws IOException { - while (pos < end) { + // We always read one extra line, which lies outside the upper + // split limit i.e. (end - 1) + pos = this.getPos(); + + while (pos <= end) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), - maxLineLength)); + int newSize = lineReader.readLine(value, maxLineLength, Integer.MAX_VALUE); if (newSize == 0) { return false; } - pos += newSize; if (newSize < maxLineLength) { return true; } @@ -151,22 +176,24 @@ /** * Get the progress within the split + * @throws IOException */ - public float getProgress() { + public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { + pos = this.getPos(); return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized long getPos() throws IOException { - return pos; + return ((Seekable)input).getPos(); } public synchronized void close() throws IOException { - if (in != null) { - in.close(); + if (lineReader != null) { + lineReader.close(); } } } Index: src/mapred/org/apache/hadoop/mapred/TextInputFormat.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (revision 720573) +++ src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (working copy) @@ -38,7 +38,13 @@ } protected boolean isSplitable(FileSystem fs, Path file) { - return compressionCodecs.getCodec(file) == null; + CompressionCodec codecs = compressionCodecs.getCodec(file); + if(codecs != null){ + return codecs.canDecompressSplitInput(); + } + else{ + return true; + } } public RecordReader getRecordReader(