From 791a5cf3315ceaf94f584d05c7f645d651c3308e Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 14 Mar 2013 12:15:05 -0700 Subject: [PATCH] HBASE-7938 Add integration test for ImportTsv + LoadIncrementalHHiles The new IntegrationTestImportTsv is written in the style of the recently added IntegrationTestLoadAndVerify. It is designed to be run from maven or directly against a distributed cluster. It shares core test logic with TestImportTsv. To run it with maven, first refresh your install jars, and the run the test: $ mvn clean package install -DskipTests $ cd hbase-it $ mvn failsafe:integration-test -Dit.test=IntegrationTestImportTsv NOTE: Running from maven fails because HFileOutputFormat is creating the partition file on localFs, not HDFS. Or run it stand-alone: $ HBASE_CLASSPATH="hbase-it/target/hbase-it--tests.jar" \ ./bin/hbase org.apache.hadoop.hbase.mapreduce.IntegrationTestImportTsv When run stand-alone, it supports all the usual Hadoop Tool arguments, particularly -conf. ImportTsv was refactored to make code sharing easier. It is upgraded from MediumTest to LargeTest because it take 180+ seconds to run on my machine. Tests for the TsvParser are separated into their own SmallTest. --- .../hbase/mapreduce/IntegrationTestImportTsv.java | 191 ++++++++ .../apache/hadoop/hbase/HBaseTestingUtility.java | 61 ++- .../hadoop/hbase/mapreduce/TestImportTsv.java | 479 ++++++++++----------- .../hbase/mapreduce/TestImportTsvParser.java | 171 ++++++++ 4 files changed, 643 insertions(+), 259 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java new file mode 100644 index 0000000..5b14deb --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java @@ -0,0 +1,191 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static java.lang.String.format; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster. + */ +@Category(IntegrationTests.class) +public class IntegrationTestImportTsv implements Configurable, Tool { + + private static final String NAME = IntegrationTestImportTsv.class.getSimpleName(); + protected static final Log LOG = LogFactory.getLog(IntegrationTestImportTsv.class); + + protected static final String simple_tsv = + "row1\t1\tc1\tc2\n" + + "row2\t1\tc1\tc2\n" + + "row3\t1\tc1\tc2\n" + + "row4\t1\tc1\tc2\n" + + "row5\t1\tc1\tc2\n" + + "row6\t1\tc1\tc2\n" + + "row7\t1\tc1\tc2\n" + + "row8\t1\tc1\tc2\n" + + "row9\t1\tc1\tc2\n" + + "row10\t1\tc1\tc2\n"; + + protected static final Set simple_expected = + new TreeSet(KeyValue.COMPARATOR) { + private static final long serialVersionUID = 1L; + { + byte[] family = Bytes.toBytes("d"); + for (String line : simple_tsv.split("\n")) { + String[] row = line.split("\t"); + byte[] key = Bytes.toBytes(row[0]); + long ts = Long.parseLong(row[1]); + byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; + add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0])); + add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1])); + } + } + }; + + // this instance is initialized on first access when the test is run from + // JUnit/Maven or by main when run from the CLI. + protected static IntegrationTestingUtility util = null; + + public Configuration getConf() { + return util.getConfiguration(); + } + + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + if (null == util) { + util = new IntegrationTestingUtility(); + } + util.initializeCluster(1); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.restoreCluster(); + util = null; + } + + /** + * Verify the data described by simple_tsv matches + * simple_expected. + */ + protected void doLoadIncrementalHFiles(Path hfiles, String tableName) + throws Exception { + + String[] args = { hfiles.toString(), tableName }; + LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args))); + assertEquals("Loading HFiles failed.", + 0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args)); + + HTable table = null; + Scan scan = new Scan() {{ + setCacheBlocks(false); + setCaching(1000); + }}; + try { + table = new HTable(getConf(), tableName); + Iterator resultsIt = table.getScanner(scan).iterator(); + Iterator expectedIt = simple_expected.iterator(); + while (resultsIt.hasNext() && expectedIt.hasNext()) { + Result r = resultsIt.next(); + for (KeyValue actual : r.raw()) { + assertTrue( + "Ran out of expected values prematurely!", + expectedIt.hasNext()); + KeyValue expected = expectedIt.next(); + assertTrue( + format("Scan produced surprising result. expected: <%s>, actual: %s", + expected, actual), + KeyValue.COMPARATOR.compare(expected, actual) == 0); + } + } + assertFalse("Did not consume all expected values.", expectedIt.hasNext()); + assertFalse("Did not consume all scan results.", resultsIt.hasNext()); + } finally { + if (null != table) table.close(); + } + } + + @Test + public void testGenerateAndLoad() throws Exception { + String table = NAME + "-" + UUID.randomUUID(); + String cf = "d"; + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + + String[] args = { + format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles), + format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", + ImportTsv.COLUMNS_CONF_KEY, cf, cf), + // configure the test harness to NOT delete the HFiles after they're + // generated. We need those for doLoadIncrementalHFiles + format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF), + table + }; + + // run the job, complete the load. + util.createTable(table, cf); + TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); + doLoadIncrementalHFiles(hfiles, table); + util.deleteTable(table); + util.cleanupDataTestDirOnTestFS(table); + } + + public int run(String[] args) throws Exception { + if (args.length != 0) { + System.err.println(format("%s [genericOptions]", NAME)); + System.err.println(" Runs ImportTsv integration tests against a distributed cluster."); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + + // adding more test methods? Don't forget to add them here... or consider doing what + // IntegrationTestsDriver does. + provisionCluster(); + testGenerateAndLoad(); + releaseCluster(); + + return 0; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + util = new IntegrationTestingUtility(conf); + // not using ToolRunner to avoid unnecessary call to setConf() + args = new GenericOptionsParser(conf, args).getRemainingArgs(); + int status = new IntegrationTestImportTsv().run(args); + System.exit(status); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 5355c98..bd3bb63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -315,7 +315,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * @return Where to write test data on the test filesystem; Returns working directory - * for the test filesytem by default + * for the test filesystem by default * @see #setupDataTestDirOnTestFS() * @see #getTestFileSystem() */ @@ -396,6 +396,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { FileSystem fs = getTestFileSystem(); if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { File dataTestDir = new File(getDataTestDir().toString()); + dataTestDir.deleteOnExit(); dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath()); } else { Path base = getBaseTestDirOnTestFS(); @@ -404,6 +405,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { fs.deleteOnExit(dataTestDirOnTestFS); } } + + /** + * Cleans the test data directory on the test filesystem. + * @return True if we removed the test dirs + * @throws IOException + */ + public boolean cleanupDataTestDirOnTestFS() throws IOException { + boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); + if (ret) + dataTestDirOnTestFS = null; + return ret; + } + + /** + * Cleans a subdirectory under the test data directory on the test filesystem. + * @return True if we removed child + * @throws IOException + */ + public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { + Path cpath = getDataTestDirOnTestFS(subdirName); + return getTestFileSystem().delete(cpath, true); + } + /** * Start a minidfscluster. * @param servers How many DNs to start. @@ -959,6 +983,33 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { getMiniHBaseCluster().compact(tableName, major); } + /** + * Create a table. + * @param tableName + * @param family + * @return An HTable instance for the created table. + * @throws IOException + */ + public HTable createTable(String tableName, String family) + throws IOException{ + return createTable(tableName, new String[] { family }); + } + + /** + * Create a table. + * @param tableName + * @param families + * @return An HTable instance for the created table. + * @throws IOException + */ + public HTable createTable(String tableName, String[] families) + throws IOException { + List fams = new ArrayList(families.length); + for (String family : families) { + fams.add(Bytes.toBytes(family)); + } + return createTable(Bytes.toBytes(tableName), fams.toArray(new byte[0][])); + } /** * Create a table. @@ -1121,6 +1172,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Drop an existing table * @param tableName existing table */ + public void deleteTable(String tableName) throws IOException { + deleteTable(Bytes.toBytes(tableName)); + } + + /** + * Drop an existing table + * @param tableName existing table + */ public void deleteTable(byte[] tableName) throws IOException { try { getHBaseAdmin().disableTable(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 172b759..d6fa892 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -18,343 +18,306 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import java.io.UnsupportedEncodingException; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; +@Category(LargeTests.class) +public class TestImportTsv implements Configurable { -@Category(MediumTests.class) -public class TestImportTsv { - private static final Log LOG = LogFactory.getLog(TestImportTsv.class); + protected static final Log LOG = LogFactory.getLog(TestImportTsv.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); - @Test - public void testTsvParserSpecParsing() { - TsvParser parser; - - parser = new TsvParser("HBASE_ROW_KEY", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", - "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertTrue(parser.hasTimestamp()); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - } - - @Test - public void testTsvParser() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); - assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); - assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); - assertNull(parser.getFamily(2)); - assertNull(parser.getQualifier(2)); - assertEquals(2, parser.getRowKeyColumnIndex()); - - assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser - .getTimestampKeyColumnIndex()); - - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); - ParsedLine parsed = parser.parse(line, line.length); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - - @Test - public void testTsvParserWithTimestamp() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertNull(parser.getFamily(1)); - assertNull(parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); - assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertEquals(1, parser.getTimestampKeyColumnIndex()); - - byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); - ParsedLine parsed = parser.parse(line, line.length); - assertEquals(1234l, parsed.getTimestamp(-1)); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - private void checkParsing(ParsedLine parsed, Iterable expected) { - ArrayList parsedCols = new ArrayList(); - for (int i = 0; i < parsed.getColumnCount(); i++) { - parsedCols.add(Bytes.toString( - parsed.getLineBytes(), - parsed.getColumnOffset(i), - parsed.getColumnLength(i))); - } - if (!Iterables.elementsEqual(parsedCols, expected)) { - fail("Expected: " + Joiner.on(",").join(expected) + "\n" + - "Got:" + Joiner.on(",").join(parsedCols)); - } - } - - private void assertBytesEquals(byte[] a, byte[] b) { - assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); - } + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; /** - * Test cases that throw BadTsvLineException + * Force use of combiner in doMROnTableTest. Boolean. Default is true. */ - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - parser.parse(line, line.length); - } + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes(""); - parser.parse(line, line.length); - } + private final String FAMILY = "FAM"; - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("key_only"); - parser.parse(line, line.length); + public Configuration getConf() { + return util.getConfiguration(); } - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); - byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); - parser.parse(line, line.length); + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserInvalidTimestamp() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); - assertEquals(1, parser.getTimestampKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); - ParsedLine parsed = parser.parse(line, line.length); - assertEquals(-1, parsed.getTimestamp(-1)); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + + @BeforeClass + public static void provisionCluster() throws Exception { + util.startMiniCluster(); + util.startMiniMapReduceCluster(); } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserNoTimestampValue() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\tval_a"); - parser.parse(line, line.length); + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); } - @Test - public void testMROnTable() - throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile.esv"; + public void testMROnTable() throws Exception { + String table = "test-" + UUID.randomUUID(); // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - TABLE_NAME, - INPUT_FILE + table }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, null, args, 1); + util.deleteTable(table); } @Test public void testMROnTableWithTimestamp() throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile1.csv"; + String table = "test-" + UUID.randomUUID(); // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE }; - + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + table + }; String data = "KEY,1234,VALUE1,VALUE2\n"; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1); + + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(table); } @Test public void testMROnTableWithCustomMapper() throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile2.esv"; + String table = "test-" + UUID.randomUUID(); // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper", - TABLE_NAME, - INPUT_FILE + table }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); } + + @Test + public void testBulkOutputWithoutAnExistingTable() throws Exception { + String table = "test-" + UUID.randomUUID(); - private void doMROnTableTest(String inputFile, String family, - String tableName, String data, String[] args, int valueMultiplier) - throws Exception { - - // Cluster - HBaseTestingUtility htu1 = new HBaseTestingUtility(); - - htu1.startMiniCluster(); - htu1.startMiniMapReduceCluster(); - - Tool tool = new ImportTsv(); - tool.setConf(htu1.getConfiguration()); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + table + }; - try { - FileSystem fs = FileSystem.get(tool.getConf()); - FSDataOutputStream op = fs.create(new Path(inputFile), true); - if (data == null) { - data = "KEY\u001bVALUE1\u001bVALUE2\n"; - } - op.write(Bytes.toBytes(data)); - op.close(); - LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile)))); - - if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(family)); - HBaseAdmin admin = new HBaseAdmin(tool.getConf()); - admin.createTable(desc); - admin.close(); - } - // force use of combiner for testing purposes - tool.getConf().setInt("min.num.spills.for.combine", 1); - assertEquals(0, ToolRunner.run(tool, args)); - - HTable table = new HTable(tool.getConf(), tableName); - boolean verified = false; - long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000); - int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(Bytes.toBytes(family)); - ResultScanner resScanner = table.getScanner(scan); - for (Result res : resScanner) { - assertTrue(res.size() == 2); - List kvs = res.list(); - assertEquals(toU8Str(kvs.get(0).getRow()), - toU8Str(Bytes.toBytes("KEY"))); - assertEquals(toU8Str(kvs.get(1).getRow()), - toU8Str(Bytes.toBytes("KEY"))); - assertEquals(toU8Str(kvs.get(0).getValue()), - toU8Str(Bytes.toBytes("VALUE" + valueMultiplier))); - assertEquals(toU8Str(kvs.get(1).getValue()), - toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier))); - // Only one result set is expected, so let it loop. - } - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - table.close(); - assertTrue(verified); - } finally { - htu1.shutdownMiniMapReduceCluster(); - htu1.shutdownMiniCluster(); - } + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); } - + @Test - public void testBulkOutputWithoutAnExistingTable() throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile2.esv"; + public void testBulkOutputWithAnExistingTable() throws Exception { + String table = "test-" + UUID.randomUUID(); // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME, - INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + table + }; + + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); } - public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { - return new String(bytes); + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, + String data, String[] args) throws Exception { + return doMROnTableTest(util, family, data, args, 1); } + /** + * Run an ImportTsv job and perform basic validation on the results. + * Returns the ImportTsv Tool instance so that other tests can + * inspect it for further validation as necessary. This method is static to + * insure non-reliance on instance's util/conf facilities. + * @param args Any arguments to pass BEFORE inputFile path is appended. + * @return The Tool instance used to run the test. + */ + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, + String data, String[] args, int valueMultiplier) + throws Exception { + String table = args[args.length - 1]; + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("min.num.spills.for.combine", 1); + } + + // run the import + List argv = new ArrayList(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + + // Perform basic validation. If the input args did not include + // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. + // Otherwise, validate presence of hfiles. + boolean createdHFiles = false; + String outputPath = null; + for (String arg : argv) { + if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { + createdHFiles = true; + // split '-Dfoo=bar' on '=' and keep 'bar' + outputPath = arg.split("=")[1]; + break; + } + } + + if (createdHFiles) + validateHFiles(fs, outputPath, family); + else + validateTable(conf, table, family, valueMultiplier); + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table); + } + return tool; + } + + /** + * Confirm ImportTsv via data in online table. + */ + private static void validateTable(Configuration conf, String tableName, + String family, int valueMultiplier) throws IOException { + + LOG.debug("Validating table."); + HTable table = new HTable(conf, tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + ResultScanner resScanner = table.getScanner(scan); + for (Result res : resScanner) { + assertTrue(res.size() == 2); + List kvs = res.list(); + assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY")); + assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY")); + assertArrayEquals(kvs.get(0).getValue(), + Bytes.toBytes("VALUE" + valueMultiplier)); + assertArrayEquals(kvs.get(1).getValue(), + Bytes.toBytes("VALUE" + 2 * valueMultiplier)); + // Only one result set is expected, so let it loop. + } + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + assertTrue(verified); + } + + /** + * Confirm ImportTsv via HFiles on fs. + */ + private static void validateHFiles(FileSystem fs, String outputPath, String family) + throws IOException { + + // validate number and content of output columns + LOG.debug("Validating HFiles."); + Set configFamilies = new HashSet(); + configFamilies.add(family); + Set foundFamilies = new HashSet(); + for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { + String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); + String cf = elements[elements.length - 1]; + foundFamilies.add(cf); + assertTrue( + String.format( + "HFile ouput contains a column family (%s) not present in input families (%s)", + cf, configFamilies), + configFamilies.contains(cf)); + for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { + assertTrue( + String.format("HFile %s appears to contain no data.", hfile.getPath()), + hfile.getLen() > 0); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java new file mode 100644 index 0000000..e39ff53 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -0,0 +1,171 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +/** + * Tests for {@link TsvParser}. + */ +@Category(SmallTests.class) +public class TestImportTsvParser { + + private void assertBytesEquals(byte[] a, byte[] b) { + assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); + } + + private void checkParsing(ParsedLine parsed, Iterable expected) { + ArrayList parsedCols = new ArrayList(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i), + parsed.getColumnLength(i))); + } + if (!Iterables.elementsEqual(parsedCols, expected)) { + fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:" + + Joiner.on(",").join(parsedCols)); + } + } + + @Test + public void testTsvParserSpecParsing() { + TsvParser parser; + + parser = new TsvParser("HBASE_ROW_KEY", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + } + + @Test + public void testTsvParser() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + assertEquals(2, parser.getRowKeyColumnIndex()); + + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, + parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + @Test + public void testTsvParserWithTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertNull(parser.getFamily(1)); + assertNull(parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(1234l, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + /** + * Test cases that throw BadTsvLineException + */ + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes(""); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes("key_only"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); + byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserInvalidTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(-1, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserNoTimestampValue() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a"); + parser.parse(line, line.length); + } +} -- 1.8.1