From bc6a0e90f960647911d1545f284ec8d7c8b0b33d Mon Sep 17 00:00:00 2001 From: "Apekshit(Appy) Sharma" Date: Tue, 30 Jun 2015 18:33:18 -0700 Subject: [PATCH] HBASE-13702 Add dry mode and invalid row logging to ImportTsv. (Apekshit) --- .../hbase/mapreduce/IntegrationTestImportTsv.java | 23 +- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 85 ++++-- .../hadoop/hbase/mapreduce/TsvImporterMapper.java | 22 +- .../hbase/mapreduce/TsvImporterTextMapper.java | 17 +- .../hadoop/hbase/mapreduce/TestImportTsv.java | 318 +++++++++++++-------- 5 files changed, 297 insertions(+), 168 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java index da6f68a..7a1dee6 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -38,11 +40,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -187,19 +189,18 @@ public class IntegrationTestImportTsv implements Configurable, Tool { Path hfiles = new Path( util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); - String[] args = { - format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles), - format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", - ImportTsv.COLUMNS_CONF_KEY, cf, cf), - // configure the test harness to NOT delete the HFiles after they're - // generated. We need those for doLoadIncrementalHFiles - format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF), - table.getNameAsString() - }; + + Map args = new HashMap(); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.COLUMNS_CONF_KEY, + format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf)); + // configure the test harness to NOT delete the HFiles after they're + // generated. We need those for doLoadIncrementalHFiles + args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false"); // run the job, complete the load. util.createTable(table, new String[]{cf}); - Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); + Tool t = TestImportTsv.doMROnTableTest(util, table.getNameAsString(), cf, simple_tsv, args); doLoadIncrementalHFiles(hfiles, table); // validate post-conditions diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 90f2f0e..5d22e27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -53,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; @@ -86,6 +88,9 @@ public class ImportTsv extends Configured implements Tool { public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; // TODO: the rest of these configs are used exclusively by TsvImporterMapper. // Move them out of the tool and let the mapper handle its own validation. + public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run"; + // If true, bad lines are logged to stderr. Default: false. + public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines"; public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; @@ -99,6 +104,11 @@ public class ImportTsv extends Configured implements Tool { final static Class DEFAULT_MAPPER = TsvImporterMapper.class; public final static String CREATE_TABLE_CONF_KEY = "create.table"; public final static String NO_STRICT_COL_FAMILY = "no.strict"; + /** + * If table didn't exist and was created in dry-run mode, this flag is + * flipped to delete it when MR ends. + */ + private static boolean dryRunTableCreated; public static class TsvParser { /** @@ -450,9 +460,10 @@ public class ImportTsv extends Configured implements Tool { * @return The newly created job. * @throws IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + protected static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException { Job job = null; + boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Admin admin = connection.getAdmin()) { // Support non-XML supported characters @@ -476,6 +487,7 @@ public class ImportTsv extends Configured implements Tool { FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(mapperClass); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); String columns[] = conf.getStrings(COLUMNS_CONF_KEY); if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { @@ -486,13 +498,19 @@ public class ImportTsv extends Configured implements Tool { if (hfileOutPath != null) { if (!admin.tableExists(tableName)) { - String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.warn(format("Table '%s' does not exist.", tableName)); if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { - LOG.warn(errorMsg); // TODO: this is backwards. Instead of depending on the existence of a table, // create a sane splits file for HFileOutputFormat based on data sampling. createTable(admin, tableName, columns); + if (isDryRun) { + LOG.warn("Dry run: Table will be deleted at end of dry run."); + dryRunTableCreated = true; + } } else { + String errorMsg = + format("Table '%s' does not exist and '%s' is set to no.", tableName, + CREATE_TABLE_CONF_KEY); LOG.error(errorMsg); throw new TableNotFoundException(errorMsg); } @@ -523,21 +541,22 @@ public class ImportTsv extends Configured implements Tool { + "=true.\n"; usage(msg); System.exit(-1); - } + } } - job.setReducerClass(PutSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); if (mapperClass.equals(TsvImporterTextMapper.class)) { job.setMapOutputValueClass(Text.class); job.setReducerClass(TextSortReducer.class); } else { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); + job.setReducerClass(PutSortReducer.class); + } + if (!isDryRun) { + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + regionLocator); } - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), - regionLocator); } } else { if (!admin.tableExists(tableName)) { @@ -552,13 +571,20 @@ public class ImportTsv extends Configured implements Tool { + " or custom mapper whose value type is Put."); System.exit(-1); } - // No reducers. Just write straight to table. Call initTableReducerJob - // to set up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, - job); + if (!isDryRun) { + // No reducers. Just write straight to table. Call initTableReducerJob + // to set up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); + } job.setNumReduceTasks(0); } - + if (isDryRun) { + job.setOutputFormatClass(NullOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", + job.getConfiguration().get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Function.class /* Guava used by TsvParser */); @@ -579,7 +605,24 @@ public class ImportTsv extends Configured implements Tool { tableName, cfSet)); admin.createTable(htd); } - + + private static void deleteTable(Configuration conf, String[] args) { + TableName tableName = TableName.valueOf(args[0]); + try (Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin()) { + try { + admin.disableTable(tableName); + } catch (TableNotEnabledException e) { + LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it."); + } + admin.deleteTable(tableName); + } catch (IOException e) { + LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString())); + return; + } + LOG.info(format("Dry run: Deleted table '%s'.", tableName)); + } + private static Set getColumnFamilies(String[] columns) { Set cfSet = new HashSet(); for (String aColumn : columns) { @@ -630,7 +673,10 @@ public class ImportTsv extends Configured implements Tool { " Note: if you do not use this option, then the target table must already exist in HBase\n" + "\n" + "Other options that may be specified with -D include:\n" + + " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" + + " table. If table does not exist, it is created but deleted in the end.\n" + " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + + " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" + " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + @@ -717,8 +763,13 @@ public class ImportTsv extends Configured implements Tool { // system time getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); + dryRunTableCreated = false; Job job = createSubmittableJob(getConf(), otherArgs); - return job.waitForCompletion(true) ? 0 : 1; + boolean success = job.waitForCompletion(true); + if (dryRunTableCreated) { + deleteTable(getConf(), args); + } + return success ? 0 : 1; } public static void main(String[] args) throws Exception { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 270de75..9f1b4c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -57,6 +57,7 @@ extends Mapper /** Should skip bad lines */ private boolean skipBadLines; private Counter badLineCount; + private boolean logBadLines; protected ImportTsv.TsvParser parser; @@ -129,6 +130,7 @@ extends Mapper skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); } @@ -163,26 +165,16 @@ extends Mapper populatePut(lineBytes, parsed, put, i); } context.write(rowKey, put); - } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { - if (skipBadLines) { - System.err.println( - "Bad line at offset: " + offset.get() + ":\n" + - badLine.getMessage()); - incrementBadLineCount(1); - return; - } else { - throw new IOException(badLine); + } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) { + if (logBadLines) { + System.err.println(value); } - } catch (IllegalArgumentException e) { + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); if (skipBadLines) { - System.err.println( - "Bad line at offset: " + offset.get() + ":\n" + - e.getMessage()); incrementBadLineCount(1); return; - } else { - throw new IOException(e); } + throw new IOException(badLine); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java index 9d97cab..7744ea7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java @@ -45,6 +45,7 @@ extends Mapper /** Should skip bad lines */ private boolean skipBadLines; private Counter badLineCount; + private boolean logBadLines; private ImportTsv.TsvParser parser; @@ -97,6 +98,7 @@ extends Mapper } skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); } @@ -110,21 +112,16 @@ extends Mapper ImmutableBytesWritable rowKey = new ImmutableBytesWritable( value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); context.write(rowKey, value); - } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { + } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) { + if (logBadLines) { + System.err.println(value); + } + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); if (skipBadLines) { - System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); incrementBadLineCount(1); return; } throw new IOException(badLine); - } catch (IllegalArgumentException e) { - if (skipBadLines) { - System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage()); - incrementBadLineCount(1); - return; - } else { - throw new IOException(e); - } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 2ad796a..d94b08f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -19,13 +19,15 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -53,13 +55,17 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; @Category(LargeTests.class) public class TestImportTsv implements Configurable { @@ -68,10 +74,7 @@ public class TestImportTsv implements Configurable { protected static final String NAME = TestImportTsv.class.getSimpleName(); protected static HBaseTestingUtility util = new HBaseTestingUtility(); - /** - * Delete the tmp directory after running doMROnTableTest. Boolean. Default is - * false. - */ + // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; /** @@ -80,6 +83,11 @@ public class TestImportTsv implements Configurable { protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; private final String FAMILY = "FAM"; + private String table; + private Map args; + + @Rule + public ExpectedException exception = ExpectedException.none(); public Configuration getConf() { return util.getConfiguration(); @@ -101,112 +109,80 @@ public class TestImportTsv implements Configurable { util.shutdownMiniCluster(); } - @Test - public void testMROnTable() throws Exception { - String table = "test-" + UUID.randomUUID(); - + @Before + public void setup() throws Exception { + table = "test-" + UUID.randomUUID(); + args = new HashMap(); // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - table - }; + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); + } + @Test + public void testMROnTable() throws Exception { util.createTable(TableName.valueOf(table), FAMILY); - doMROnTableTest(util, FAMILY, null, args, 1); + doMROnTableTest(null, 1); util.deleteTable(table); } @Test public void testMROnTableWithTimestamp() throws Exception { - String table = "test-" + UUID.randomUUID(); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY - + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", - table - }; + util.createTable(TableName.valueOf(table), FAMILY); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); String data = "KEY,1234,VALUE1,VALUE2\n"; - util.createTable(TableName.valueOf(table), FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); + doMROnTableTest(data, 1); util.deleteTable(table); } - @Test public void testMROnTableWithCustomMapper() throws Exception { - String table = "test-" + UUID.randomUUID(); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper", - table - }; - util.createTable(TableName.valueOf(table), FAMILY); - doMROnTableTest(util, FAMILY, null, args, 3); + args.put(ImportTsv.MAPPER_CONF_KEY, + "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); + + doMROnTableTest(null, 3); util.deleteTable(table); } @Test public void testBulkOutputWithoutAnExistingTable() throws Exception { - String table = "test-" + UUID.randomUUID(); - // Prepare the arguments required for the test. Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - table - }; + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - doMROnTableTest(util, FAMILY, null, args, 3); + doMROnTableTest(null, 3); util.deleteTable(table); } @Test public void testBulkOutputWithAnExistingTable() throws Exception { - String table = "test-" + UUID.randomUUID(); + util.createTable(TableName.valueOf(table), FAMILY); // Prepare the arguments required for the test. Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - table - }; + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - util.createTable(TableName.valueOf(table), FAMILY); - doMROnTableTest(util, FAMILY, null, args, 3); + doMROnTableTest(null, 3); util.deleteTable(table); } @Test public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { - String table = "test-" + UUID.randomUUID(); + util.createTable(TableName.valueOf(table), FAMILY); + // Prepare the arguments required for the test. Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true", - table - }; - util.createTable(TableName.valueOf(table), FAMILY); - doMROnTableTest(util, FAMILY, null, args, 3); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); + doMROnTableTest(null, 3); util.deleteTable(table); } @Test public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { - String table = "test-" + UUID.randomUUID(); Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles"); String INPUT_FILE = "InputFile1.csv"; // Prepare the arguments required for the test. @@ -226,53 +202,150 @@ public class TestImportTsv implements Configurable { assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); assertTrue(job.getReducerClass().equals(TextSortReducer.class)); assertTrue(job.getMapOutputValueClass().equals(Text.class)); + // Delete table created by createSubmittableJob. + util.deleteTable(table); } @Test public void testBulkOutputWithTsvImporterTextMapper() throws Exception { - String table = "test-" + UUID.randomUUID(); - String FAMILY = "FAM"; Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles"); - // Prepare the arguments required for the test. - String[] args = - new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY - + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table - }; + args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); String data = "KEY\u001bVALUE4\u001bVALUE8\n"; - doMROnTableTest(util, FAMILY, data, args, 4); + doMROnTableTest(data, 4); + util.deleteTable(table); } - @Test(expected = TableNotFoundException.class) + @Test public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { - String table = "test-" + UUID.randomUUID(); - String[] args = - new String[] { table, "/inputFile" }; + String[] args = new String[] { table, "/inputFile" }; Configuration conf = new Configuration(util.getConfiguration()); conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); - ImportTsv.createSubmittableJob(conf, args); + exception.expect(TableNotFoundException.class); + assertEquals("running test job configuration failed.", 0, + ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { + @Override public int run(String[] args) throws Exception { + createSubmittableJob(getConf(), args); + return 0; + } + }, args)); } - @Test(expected = TableNotFoundException.class) + @Test public void testMRWithoutAnExistingTable() throws Exception { - String table = "test-" + UUID.randomUUID(); String[] args = new String[] { table, "/inputFile" }; - Configuration conf = new Configuration(util.getConfiguration()); - ImportTsv.createSubmittableJob(conf, args); + exception.expect(TableNotFoundException.class); + assertEquals("running test job configuration failed.", 0, ToolRunner.run( + new Configuration(util.getConfiguration()), + new ImportTsv() { + @Override + public int run(String[] args) throws Exception { + createSubmittableJob(getConf(), args); + return 0; + } + }, args)); + } + + @Test + public void testJobConfigurationsWithDryMode() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles"); + String INPUT_FILE = "InputFile1.csv"; + // Prepare the arguments required for the test. + String[] argsArray = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), + "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", + table, + INPUT_FILE }; + assertEquals("running test job configuration failed.", 0, ToolRunner.run( + new Configuration(util.getConfiguration()), + new ImportTsv() { + @Override + public int run(String[] args) throws Exception { + Job job = createSubmittableJob(getConf(), args); + assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); + return 0; + } + }, argsArray)); + // Delete table created by createSubmittableJob. + util.deleteTable(table); } - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, - String data, String[] args) throws Exception { - return doMROnTableTest(util, family, data, args, 1); + @Test + public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { + util.createTable(TableName.valueOf(table), FAMILY); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + doMROnTableTest(null, 1); + // Dry mode should not delete an existing table. If it's not present, + // this will throw TableNotFoundException. + util.deleteTable(table); + } + + /** + * If table is not present in non-bulk mode, dry run should fail just like + * normal mode. + */ + @Test + public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + exception.expect(TableNotFoundException.class); + doMROnTableTest(null, 1); + } + + @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception { + util.createTable(TableName.valueOf(table), FAMILY); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + doMROnTableTest(null, 1); + // Dry mode should not delete an existing table. If it's not present, + // this will throw TableNotFoundException. + util.deleteTable(table); + } + + /** + * If table is not present in bulk mode and create.table is not set to yes, + * import should fail with TableNotFoundException. + */ + @Test + public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws + Exception { + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); + exception.expect(TableNotFoundException.class); + doMROnTableTest(null, 1); + } + + @Test + public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); + doMROnTableTest(null, 1); + // Verify temporary table was deleted. + exception.expect(TableNotFoundException.class); + util.deleteTable(table); + } + + private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { + return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String table, + String family, String data, Map args) throws Exception { + return doMROnTableTest(util, table, family, data, args, 1); } /** @@ -283,10 +356,9 @@ public class TestImportTsv implements Configurable { * @param args Any arguments to pass BEFORE inputFile path is appended. * @return The Tool instance used to run the test. */ - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, - String data, String[] args, int valueMultiplier) + protected static Tool doMROnTableTest(HBaseTestingUtility util, String table, + String family, String data, Map args, int valueMultiplier) throws Exception { - String table = args[args.length - 1]; Configuration conf = new Configuration(util.getConfiguration()); // populate input file @@ -305,32 +377,40 @@ public class TestImportTsv implements Configurable { conf.setInt("mapreduce.map.combine.minspills", 1); } + // Build args array. + String[] argsArray = new String[args.size() + 2]; + Iterator it = args.entrySet().iterator(); + int i = 0; + while (it.hasNext()) { + Map.Entry pair = (Map.Entry) it.next(); + argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); + i++; + } + argsArray[i] = table; + argsArray[i + 1] = inputPath.toString(); + // run the import - List argv = new ArrayList(Arrays.asList(args)); - argv.add(inputPath.toString()); Tool tool = new ImportTsv(); - LOG.debug("Running ImportTsv with arguments: " + argv); - assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + LOG.debug("Running ImportTsv with arguments: " + argsArray); + assertEquals(0, ToolRunner.run(conf, tool, argsArray)); // Perform basic validation. If the input args did not include // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. // Otherwise, validate presence of hfiles. - boolean createdHFiles = false; - String outputPath = null; - for (String arg : argv) { - if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { - createdHFiles = true; - // split '-Dfoo=bar' on '=' and keep 'bar' - outputPath = arg.split("=")[1]; - break; + boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) && + "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); + if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { + if (isDryRun) { + assertFalse(String.format("Dry run mode, %s should not have been created.", + ImportTsv.BULK_OUTPUT_CONF_KEY), + fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); + } else { + validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family); } + } else { + validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun); } - if (createdHFiles) - validateHFiles(fs, outputPath, family); - else - validateTable(conf, TableName.valueOf(table), family, valueMultiplier); - if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { LOG.debug("Deleting test subdirectory"); util.cleanupDataTestDirOnTestFS(table); @@ -342,7 +422,7 @@ public class TestImportTsv implements Configurable { * Confirm ImportTsv via data in online table. */ private static void validateTable(Configuration conf, TableName tableName, - String family, int valueMultiplier) throws IOException { + String family, int valueMultiplier, boolean isDryRun) throws IOException { LOG.debug("Validating table."); Table table = new HTable(conf, tableName); @@ -355,8 +435,10 @@ public class TestImportTsv implements Configurable { // Scan entire family. scan.addFamily(Bytes.toBytes(family)); ResultScanner resScanner = table.getScanner(scan); + int numRows = 0; for (Result res : resScanner) { - assertTrue(res.size() == 2); + numRows++; + assertEquals(2, res.size()); List kvs = res.listCells(); assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); @@ -364,6 +446,11 @@ public class TestImportTsv implements Configurable { assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); // Only one result set is expected, so let it loop. } + if (isDryRun) { + assertEquals(0, numRows); + } else { + assertEquals(1, numRows); + } verified = true; break; } catch (NullPointerException e) { @@ -385,7 +472,6 @@ public class TestImportTsv implements Configurable { */ private static void validateHFiles(FileSystem fs, String outputPath, String family) throws IOException { - // validate number and content of output columns LOG.debug("Validating HFiles."); Set configFamilies = new HashSet(); @@ -397,7 +483,7 @@ public class TestImportTsv implements Configurable { foundFamilies.add(cf); assertTrue( String.format( - "HFile ouput contains a column family (%s) not present in input families (%s)", + "HFile output contains a column family (%s) not present in input families (%s)", cf, configFamilies), configFamilies.contains(cf)); for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { @@ -406,6 +492,8 @@ public class TestImportTsv implements Configurable { hfile.getLen() > 0); } } + assertTrue(String.format("HFile output does not contain the input family '%s'.", family), + foundFamilies.contains(family)); } } -- 2.3.2 (Apple Git-55)