From 55168d5cefd6f5cbacfb6681da228a8ee303b5bd Mon Sep 17 00:00:00 2001 From: Bhupendra Date: Mon, 14 Sep 2015 15:23:16 +0530 Subject: [PATCH] HBASE-14380 Correct data also getting skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper --- .../hadoop/hbase/mapreduce/TextSortReducer.java | 4 +- .../hadoop/hbase/mapreduce/TestImportTsv.java | 76 +++++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index b3981a1..62b62f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -188,14 +188,14 @@ public class TextSortReducer extends if (skipBadLines) { System.err.println("Bad line." + badLine.getMessage()); incrementBadLineCount(1); - return; + continue; } throw new IOException(badLine); } catch (IllegalArgumentException e) { if (skipBadLines) { System.err.println("Bad line." + e.getMessage()); incrementBadLineCount(1); - return; + continue; } throw new IOException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 2ad796a..b613823 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HTable; @@ -49,6 +48,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; @@ -270,11 +273,38 @@ public class TestImportTsv implements Configurable { ImportTsv.createSubmittableJob(conf, args); } + /** + * If there are invalid data rows as inputs, then only those rows should be ignored. + */ + @Test + public void testTsvImporterTextMapperWithInvalidData() throws Exception { + String table = "test-" + UUID.randomUUID(); + String FAMILY = "FAM"; + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table }; + // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS + String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; + doMROnTableTest(util, FAMILY, data, args, 1, 4); + util.deleteTable(table); + } + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args) throws Exception { return doMROnTableTest(util, family, data, args, 1); } + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + return doMROnTableTest(util, family, data, args, 1, -1); + } + /** * Run an ImportTsv job and perform basic validation on the results. * Returns the ImportTsv Tool instance so that other tests can @@ -283,8 +313,8 @@ public class TestImportTsv implements Configurable { * @param args Any arguments to pass BEFORE inputFile path is appended. * @return The Tool instance used to run the test. */ - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, - String data, String[] args, int valueMultiplier) + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier, int expectedKVCount) throws Exception { String table = args[args.length - 1]; Configuration conf = new Configuration(util.getConfiguration()); @@ -327,7 +357,7 @@ public class TestImportTsv implements Configurable { } if (createdHFiles) - validateHFiles(fs, outputPath, family); + validateHFiles(fs, outputPath, family, expectedKVCount); else validateTable(conf, TableName.valueOf(table), family, valueMultiplier); @@ -383,29 +413,59 @@ public class TestImportTsv implements Configurable { /** * Confirm ImportTsv via HFiles on fs. */ - private static void validateHFiles(FileSystem fs, String outputPath, String family) - throws IOException { - + private static void validateHFiles(FileSystem fs, String outputPath, String family, + int expectedKVCount) throws IOException { // validate number and content of output columns LOG.debug("Validating HFiles."); Set configFamilies = new HashSet(); configFamilies.add(family); Set foundFamilies = new HashSet(); + int actualKVCount = 0; for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); String cf = elements[elements.length - 1]; foundFamilies.add(cf); assertTrue( String.format( - "HFile ouput contains a column family (%s) not present in input families (%s)", + "HFile output contains a column family (%s) not present in input families (%s)", cf, configFamilies), configFamilies.contains(cf)); for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { assertTrue( String.format("HFile %s appears to contain no data.", hfile.getPath()), hfile.getLen() > 0); + // count the number of KVs from all the hfiles + if (expectedKVCount > -1) { + actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); + } } } + if (expectedKVCount > -1) { + assertTrue(String.format( + "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, + expectedKVCount), actualKVCount == expectedKVCount); + } + } + + /** + * Method returns the total KVs in given hfile + * @param fs File System + * @param p HFile path + * @return KV count in the given hfile + * @throws IOException + */ + private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + reader.close(); + return count; } } -- 1.9.2.msysgit.0