From c0591701d8ae6268c19ded149c4588230e9c9240 Mon Sep 17 00:00:00 2001 From: Vladimir Rodionov Date: Wed, 7 Mar 2018 15:17:25 -0800 Subject: [PATCH] HBASE-17851: WAL to HFile conversion phase MUST detect and handle missing WAL files --- .../hbase/mapreduce/TestWALRecordReader.java | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index e486714e99..d2e6151541 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -241,6 +242,47 @@ public class TestWALRecordReader { testSplit(splits.get(1)); } + + /** + * Test WALRecordReader tolerance to moving WAL from active + * to archive directory + * @throws Exception + */ + @Test + public void testWALRecordReaderActiveArchiveTolerance() throws Exception { + final WALFactory walfactory = new WALFactory(conf, getName()); + WAL log = walfactory.getWAL(info); + byte [] value = Bytes.toBytes("value"); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + + Thread.sleep(10); // make sure 2nd edit gets a later timestamp + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + log.shutdown(); + //walfactory.shutdown(); + + // should have 2 log entries now + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + // make sure log is found + List splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(1, splits.size()); + WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); + LOG.debug("DDDD log="+logDir+" file="+ split.getLogFileName()); + + testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); + + } + protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap scopes) { return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); } @@ -270,4 +312,44 @@ public class TestWALRecordReader { assertFalse(reader.nextKeyValue()); reader.close(); } + + /** + * Create a new reader from the split, match the edits against the passed columns, + * moving WAL to archive in between readings + */ + private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { + WALRecordReader reader = getReader(); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + + assertTrue(reader.nextKeyValue()); + Cell cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + // Move log file to archive directory + // While WAL record reader is open + WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; + + Path logFile = new Path(split_.getLogFileName()); + Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); + boolean result = fs.rename(logFile, archivedLog); + assertTrue(result); + result = fs.exists(archivedLog); + assertTrue(result); + + assertTrue(reader.nextKeyValue()); + cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + reader.close(); + } } -- 2.11.0 (Apple Git-81)