Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (revision 1511891) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (working copy) @@ -18,12 +18,10 @@ package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -44,18 +42,23 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import static org.junit.Assert.*; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; @Category(MediumTests.class) +@RunWith(Parameterized.class) public class TestReplicationHLogReaderManager { - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HBaseTestingUtility TEST_UTIL; private static Configuration conf; private static Path hbaseDir; private static FileSystem fs; @@ -70,13 +73,44 @@ private HLog log; private ReplicationHLogReaderManager logManager; private PathWatcher pathWatcher; + private int nbRows; + private int walEditKVs; + @Parameters + public static Collection parameters() { + // Try out different combinations of row count and KeyValue count + int[] NB_ROWS = { 1500, 60000 }; + int[] NB_KVS = { 1, 100 }; + // whether compression is used + Boolean[] BOOL_VALS = { false, true }; + List parameters = new ArrayList(); + for (int nbRows : NB_ROWS) { + for (int walEditKVs : NB_KVS) { + for (boolean b : BOOL_VALS) { + Object[] arr = new Object[3]; + arr[0] = nbRows; + arr[1] = walEditKVs; + arr[2] = b; + parameters.add(arr); + } + } + } + return parameters; + } + public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) { + this.nbRows = nbRows; + this.walEditKVs = walEditKVs; + TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, + enableCompression); + } + @BeforeClass public static void setUpBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + conf = TEST_UTIL.getConfiguration(); TEST_UTIL.startMiniDFSCluster(3); - conf = TEST_UTIL.getConfiguration(); hbaseDir = TEST_UTIL.createRootDir(); cluster = TEST_UTIL.getDFSCluster(); fs = cluster.getFileSystem(); @@ -156,16 +190,32 @@ fail(); } catch (EOFException ex) {} + for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); } + log.rollWriter(); + logManager.openReader(path); + logManager.seek(); + for (int i = 0; i < nbRows; i++) { + HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0); + if (e == null) { + fail("Should have enough entries"); + } + } } private void appendToLog() throws IOException { - log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd); + appendToLogPlus(1); } - private WALEdit getWALEdit() { + private void appendToLogPlus(int count) throws IOException { + log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd); + } + + private WALEdit getWALEdits(int count) { WALEdit edit = new WALEdit(); - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, + for (int i = 0; i < count; i++) { + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, System.currentTimeMillis(), qualifier)); + } return edit; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (revision 1511891) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -219,7 +220,7 @@ pos += elemLen; // the rest - in.read(backingArray, pos, length - pos); + IOUtils.readFully(in, backingArray, pos, length - pos); return new KeyValue(backingArray); } @@ -229,7 +230,7 @@ // status byte indicating that data to be read is not in dictionary. // if this isn't in the dictionary, we need to add to the dictionary. int length = StreamUtils.readRawVarint32(in); - in.read(to, offset, length); + IOUtils.readFully(in, to, offset, length); dict.addEntry(to, offset, length); return length; } else {