Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (revision 1511538) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (working copy) @@ -191,6 +191,17 @@ this.compression = compression; } + void read(InputStream in, byte[] arr, int offset, int len) throws IOException { + int bytesRead = 0; + while (bytesRead < len) { + int n = in.read(arr, offset + bytesRead, len - bytesRead); + if (n < 0) { + throw new IOException("Failed read of input, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + } + @Override protected Cell parseCell() throws IOException { int keylength = StreamUtils.readRawVarint32(in); @@ -219,7 +230,7 @@ pos += elemLen; // the rest - in.read(backingArray, pos, length - pos); + read(in, backingArray, pos, length - pos); return new KeyValue(backingArray); } @@ -229,7 +240,7 @@ // status byte indicating that data to be read is not in dictionary. // if this isn't in the dictionary, we need to add to the dictionary. int length = StreamUtils.readRawVarint32(in); - in.read(to, offset, length); + read(in, to, offset, length); dict.addEntry(to, offset, length); return length; } else { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (revision 1511538) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (working copy) @@ -18,12 +18,10 @@ package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -54,7 +52,7 @@ @Category(MediumTests.class) public class TestReplicationHLogReaderManager { - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HBaseTestingUtility TEST_UTIL; private static Configuration conf; private static Path hbaseDir; private static FileSystem fs; @@ -73,9 +71,11 @@ @BeforeClass public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.startMiniDFSCluster(3); - conf = TEST_UTIL.getConfiguration(); hbaseDir = TEST_UTIL.createRootDir(); cluster = TEST_UTIL.getDFSCluster(); fs = cluster.getFileSystem(); @@ -155,16 +155,41 @@ fail(); } catch (EOFException ex) {} + + // TEST FOR HBASE-8615 + // Set this to 1500 for the IndexOutOfBoundsException in Hadoop 2, 60000 for Hadoop 1. + // Set this to 1500 for the InvalidProtocolBufferException in Hadoop 2. Couldn't repro in Hadoop 1 + int nbRows = 1500; + // Set this to 100 for IndexOutOfBoundsException + // Set this to 1 for InvalidProtocolBufferException + int walEditKVs = 100; + for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); } + log.rollWriter(); + logManager.openReader(path); + logManager.seek(); + for (int i = 0; i < nbRows; i++) { + HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0); + if (e == null) { + fail("Should have enough entries"); + } + + } } private void appendToLog() throws IOException { - log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd); + appendToLogPlus(1); } - private WALEdit getWALEdit() { + private void appendToLogPlus(int count) throws IOException { + log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd); + } + + private WALEdit getWALEdits(int count) { WALEdit edit = new WALEdit(); - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, + for (int i = 0; i < count; i++) { + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, System.currentTimeMillis(), qualifier)); + } return edit; }