From c470385e143fc848ee2e121ee23ed1df8b979f06 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 22 Mar 2013 09:43:23 -0700 Subject: [PATCH] HBASE-8147 Add test jobs launched based on other jobs This integration test covers roughly the scenario used by HCatalog's HBase bulk importer. Essentially, it runs job1 to create intermediate data, and then runs job2 to load that data into HBase. The job object for job2 is built based on job1's configuration. --- .../hbase/mapreduce/IntegrationTestImportTsv.java | 163 +++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java index 49540e6..e5adc52 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java @@ -16,9 +16,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; @@ -27,6 +30,17 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; @@ -154,6 +168,7 @@ public class IntegrationTestImportTsv implements Configurable, Tool { @Test public void testGenerateAndLoad() throws Exception { + LOG.info("Running test testGenerateAndLoad."); String table = NAME + "-" + UUID.randomUUID(); String cf = "d"; Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); @@ -179,6 +194,153 @@ public class IntegrationTestImportTsv implements Configurable, Tool { // clean up after ourselves. util.deleteTable(table); util.cleanupDataTestDirOnTestFS(table); + LOG.info("testGenerateAndLoad completed successfully."); + } + + // + // helper classes used in the following test. + // + + /** + * A {@link FileOutputCommitter} that launches an ImportTsv job through + * its {@link #commitJob(JobContext)} method. + */ + private static class JobLaunchingOuputCommitter extends FileOutputCommitter { + + public JobLaunchingOuputCommitter(Path outputPath, TaskAttemptContext context) + throws IOException { + super(outputPath, context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + + // inherit jar dependencies added to distributed cache loaded by parent job + Configuration conf = HBaseConfiguration.create(); + conf.set("mapred.job.classpath.archives", + context.getConfiguration().get("mapred.job.classpath.archives", "")); + conf.set("mapreduce.job.cache.archives.visibilities", + context.getConfiguration().get("mapreduce.job.cache.archives.visibilities", "")); + + // can't use IntegrationTest instance of util because it hasn't been + // instantiated on the JVM running this method. Create our own. + IntegrationTestingUtility util = + new IntegrationTestingUtility(conf); + + // this is why we're here: launch a child job. The rest of this should + // look a lot like TestImportTsv#testMROnTable. + final String table = format("%s-%s-child", NAME, context.getJobID()); + final String cf = "FAM"; + + String[] args = { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + table + }; + + try { + util.createTable(table, cf); + LOG.info("testRunFromOutputCommitter: launching child job."); + TestImportTsv.doMROnTableTest(util, cf, null, args, 1); + } catch (Exception e) { + throw new IOException("Underlying MapReduce job failed. Aborting commit.", e); + } finally { + util.deleteTable(table); + } + } + } + + /** + * An {@link OutputFormat} that exposes the JobLaunchingOutputCommitter. + */ + public static class JobLaunchingOutputFormat extends FileOutputFormat { + + private OutputCommitter committer = null; + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + return new RecordWriter() { + @Override + public void write(LongWritable key, Text value) throws IOException, + InterruptedException { + /* do nothing */ + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + /* do nothing */ + } + }; + } + + @Override + public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (committer == null) { + Path output = getOutputPath(context); + LOG.debug("Using JobLaunchingOuputCommitter."); + committer = new JobLaunchingOuputCommitter(output, context); + } + return committer; + } + } + + /** + * Add classes necessary for integration-test jobs. + */ + public static void addTestDependencyJars(Configuration conf) throws IOException { + TableMapReduceUtil.addDependencyJars(conf, + org.apache.hadoop.hbase.BaseConfigurable.class, // hbase-server + HBaseTestingUtility.class, // hbase-server-test + HBaseCommonTestingUtility.class, // hbase-common-test + com.google.common.collect.ListMultimap.class, // Guava + org.cloudera.htrace.Trace.class); // HTrace + } + + /** + * {@link TableMapReduceUtil#addDependencyJars(Job)} is used when + * configuring a mapreduce job to ensure dependencies of the job are shipped + * to the cluster. Sometimes those dependencies are on the classpath, but not + * packaged as a jar, for instance, when run at the end of another mapreduce + * job. In that case, dependency jars have already been shipped to the cluster + * and expanded in the parent job's run folder. This test validates the child + * job's classpath is constructed correctly under that scenario. + */ + @Test + public void testRunFromOutputCommitter() throws Exception { + LOG.info("Running test testRunFromOutputCommitter."); + + FileSystem fs = FileSystem.get(getConf()); + Path inputPath = new Path(util.getDataTestDirOnTestFS("parent"), "input.txt"); + Path outputPath = new Path(util.getDataTestDirOnTestFS("parent"), "output"); + FSDataOutputStream fout = null; + try { + fout = fs.create(inputPath, true); + fout.write(Bytes.toBytes("testRunFromOutputCommitter\n")); + LOG.debug(format("Wrote test data to file: %s", inputPath)); + } finally { + fout.close(); + } + + // create a parent job that ships the HBase dependencies. This is + // accurate as the expected calling context. + Job job = new Job(getConf(), NAME + ".testRunFromOutputCommitter - parent"); + job.setJarByClass(IntegrationTestImportTsv.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(JobLaunchingOutputFormat.class); + TextInputFormat.addInputPath(job, inputPath); + JobLaunchingOutputFormat.setOutputPath(job, outputPath); + TableMapReduceUtil.addDependencyJars(job); + addTestDependencyJars(job.getConfiguration()); + + // Job launched by the OutputCommitter will fail if dependency jars are + // not shipped properly. + LOG.info("testRunFromOutputCommitter: launching parent job."); + assertTrue(job.waitForCompletion(true)); + LOG.info("testRunFromOutputCommitter completed successfully."); } public int run(String[] args) throws Exception { @@ -194,6 +356,7 @@ public class IntegrationTestImportTsv implements Configurable, Tool { // IntegrationTestsDriver does. provisionCluster(); testGenerateAndLoad(); + testRunFromOutputCommitter(); releaseCluster(); return 0; -- 1.8.1