Index: src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (revision 0) @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestHCatMultiOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(TestHCatMultiOutputFormat.class); + + private static final String DATABASE = "default"; + private static final String[] tableNames = {"test1", "test2", "test3"}; + private static final String[] tablePerms = {"755", "750", "700"}; + private static Path warehousedir = null; + private static HashMap schemaMap = new HashMap(); + private static HiveMetaStoreClient hmsc; + private static MiniMRCluster mrCluster; + private static Configuration mrConf; + private static HiveConf hiveConf; + private static File workDir; + + private static final String msPort = "20199"; + private static Thread t; + + static { + schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols)); + schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols)); + schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols)); + } + + private static class RunMS implements Runnable { + + @Override + public void run() { + try { + String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "=" + + warehousedir.toString(); + HiveMetaStore.main(new String[] {"-v", "-p", msPort, "--hiveconf", warehouseConf}); + } catch (Throwable t) { + System.err.println("Exiting. Got exception from metastore: " + t.getMessage()); + } + } + + } + + /** + * Private class which holds all the data for the test cases + */ + private static class ColumnHolder { + + private static ArrayList hCattest1Cols = new ArrayList(); + private static ArrayList hCattest2Cols = new ArrayList(); + private static ArrayList hCattest3Cols = new ArrayList(); + + private static ArrayList partitionCols = new ArrayList(); + private static ArrayList test1Cols = new ArrayList(); + private static ArrayList test2Cols = new ArrayList(); + private static ArrayList test3Cols = new ArrayList(); + + private static HashMap> colMapping = new HashMap>(); + + static { + try { + FieldSchema keyCol = new FieldSchema("key", Constants.STRING_TYPE_NAME, ""); + test1Cols.add(keyCol); + test2Cols.add(keyCol); + test3Cols.add(keyCol); + hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + FieldSchema valueCol = new FieldSchema("value", Constants.STRING_TYPE_NAME, ""); + test1Cols.add(valueCol); + test3Cols.add(valueCol); + hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol)); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol)); + FieldSchema extraCol = new FieldSchema("extra", Constants.STRING_TYPE_NAME, ""); + test3Cols.add(extraCol); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol)); + colMapping.put("test1", test1Cols); + colMapping.put("test2", test2Cols); + colMapping.put("test3", test3Cols); + } catch (HCatException e) { + LOG.error("Error in setting up schema fields for the table", e); + throw new RuntimeException(e); + } + } + + static { + partitionCols.add(new FieldSchema("ds", Constants.STRING_TYPE_NAME, "")); + partitionCols.add(new FieldSchema("cluster", Constants.STRING_TYPE_NAME, "")); + } + } + + @BeforeClass + public static void setup() throws Exception { + String testDir = System.getProperty("test.data.dir", "./"); + testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/"; + workDir = new File(new File(testDir).getCanonicalPath()); + FileUtil.fullyDelete(workDir); + workDir.mkdirs(); + + warehousedir = new Path(workDir + "/warehouse"); + + // Run hive metastore server + t = new Thread(new RunMS()); + t.start(); + + // LocalJobRunner does not work with mapreduce OutputCommitter. So need + // to use MiniMRCluster. MAPREDUCE-2350 + Configuration conf = new Configuration(true); + FileSystem fs = FileSystem.get(conf); + System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); + mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null, + new JobConf(conf)); + mrConf = mrCluster.createJobConf(); + fs.mkdirs(warehousedir); + + initializeSetup(); + } + + private static void initializeSetup() throws Exception { + + hiveConf = new HiveConf(mrConf, TestHCatMultiOutputFormat.class); + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + + hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString()); + try { + hmsc = new HiveMetaStoreClient(hiveConf, null); + initalizeTables(); + } catch (Throwable e) { + LOG.error("Exception encountered while setting up testcase", e); + throw new Exception(e); + } finally { + hmsc.close(); + } + } + + private static void initalizeTables() throws Exception { + for (String table : tableNames) { + try { + if (hmsc.getTable(DATABASE, table) != null) { + hmsc.dropTable(DATABASE, table); + } + } catch (NoSuchObjectException ignored) { + } + } + for (int i = 0; i < tableNames.length; i++) { + createTable(tableNames[i], tablePerms[i]); + } + } + + private static void createTable(String tableName, String tablePerm) throws Exception { + Table tbl = new Table(); + tbl.setDbName(DATABASE); + tbl.setTableName(tableName); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(ColumnHolder.colMapping.get(tableName)); + tbl.setSd(sd); + sd.setParameters(new HashMap()); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); + sd.getSerdeInfo().getParameters().put( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + tbl.setPartitionKeys(ColumnHolder.partitionCols); + + hmsc.createTable(tbl); + FileSystem fs = FileSystem.get(mrConf); + fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm)); + } + + @AfterClass + public static void tearDown() throws IOException { + FileUtil.fullyDelete(workDir); + FileSystem fs = FileSystem.get(mrConf); + if (fs.exists(warehousedir)) { + fs.delete(warehousedir, true); + } + if (mrCluster != null) { + mrCluster.shutdown(); + } + } + + /** + * Simple test case. + *
    + *
  1. Submits a mapred job which writes out one fixed line to each of the tables
  2. + *
  3. uses hive fetch task to read the data and see if it matches what was written
  4. + *
+ * + * @throws Exception if any error occurs + */ + @Test + public void testOutputFormat() throws Throwable { + HashMap partitionValues = new HashMap(); + partitionValues.put("ds", "1"); + partitionValues.put("cluster", "ag"); + ArrayList infoList = new ArrayList(); + infoList.add(OutputJobInfo.create("default", tableNames[0], partitionValues)); + infoList.add(OutputJobInfo.create("default", tableNames[1], partitionValues)); + infoList.add(OutputJobInfo.create("default", tableNames[2], partitionValues)); + + Job job = new Job(hiveConf, "SampleJob"); + + job.setMapperClass(MyMapper.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(MultiOutputFormat.class); + job.setNumReduceTasks(0); + + JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); + + for (int i = 0; i < tableNames.length; i++) { + configurer.addOutputFormat(tableNames[i], HCatOutputFormat.class, BytesWritable.class, + HCatRecord.class); + HCatOutputFormat.setOutput(configurer.getJob(tableNames[i]), infoList.get(i)); + HCatOutputFormat.setSchema(configurer.getJob(tableNames[i]), + schemaMap.get(tableNames[i])); + } + configurer.configure(); + + Path filePath = createInputFile(); + FileInputFormat.addInputPath(job, filePath); + Assert.assertTrue(job.waitForCompletion(true)); + + ArrayList outputs = new ArrayList(); + for (String tbl : tableNames) { + outputs.add(getTableData(tbl, "default").get(0)); + } + Assert.assertEquals("Comparing output of table " + + tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag"); + Assert.assertEquals("Comparing output of table " + + tableNames[1] + " is not correct", outputs.get(1), "a,1,ag"); + Assert.assertEquals("Comparing output of table " + + tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag"); + + // Check permisssion on partition dirs and files created + for (int i = 0; i < tableNames.length; i++) { + Path partitionFile = new Path(warehousedir + "/" + tableNames[i] + + "/ds=1/cluster=ag/part-m-00000"); + FileSystem fs = partitionFile.getFileSystem(mrConf); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile).getPermission(), + new FsPermission(tablePerms[i])); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile.getParent()).getPermission(), + new FsPermission(tablePerms[i])); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(), + new FsPermission(tablePerms[i])); + + } + LOG.info("File permissions verified"); + } + + /** + * Create a input file for map + * + * @return absolute path of the file. + * @throws IOException if any error encountered + */ + private Path createInputFile() throws IOException { + Path f = new Path(workDir + "/MultiTableInput.txt"); + FileSystem fs = FileSystem.get(mrConf); + if (fs.exists(f)) { + fs.delete(f, true); + } + OutputStream out = fs.create(f); + for (int i = 0; i < 3; i++) { + out.write("a,a\n".getBytes()); + } + out.close(); + return f; + } + + /** + * Method to fetch table data + * + * @param table table name + * @param database database + * @return list of columns in comma seperated way + * @throws Exception if any error occurs + */ + private List getTableData(String table, String database) throws Exception { + HiveConf conf = new HiveConf(); + conf.addResource("hive-site.xml"); + ArrayList results = new ArrayList(); + ArrayList temp = new ArrayList(); + Hive hive = Hive.get(conf); + org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table); + FetchWork work; + if (!tbl.getPartCols().isEmpty()) { + List partitions = hive.getPartitions(tbl); + List partDesc = new ArrayList(); + List partLocs = new ArrayList(); + for (Partition part : partitions) { + partLocs.add(part.getLocation()); + partDesc.add(Utilities.getPartitionDesc(part)); + } + work = new FetchWork(partLocs, partDesc); + work.setLimit(100); + } else { + work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl)); + } + FetchTask task = new FetchTask(); + task.setWork(work); + task.initialize(conf, null, null); + task.fetch(temp); + for (String str : temp) { + results.add(str.replace("\t", ",")); + } + return results; + } + + private static class MyMapper extends + Mapper { + + private int i = 0; + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + HCatRecord record = null; + String[] splits = value.toString().split(","); + switch (i) { + case 0: + record = new DefaultHCatRecord(2); + record.set(0, splits[0]); + record.set(1, splits[1]); + break; + case 1: + record = new DefaultHCatRecord(1); + record.set(0, splits[0]); + break; + case 2: + record = new DefaultHCatRecord(3); + record.set(0, splits[0]); + record.set(1, splits[1]); + record.set(2, "extra"); + break; + default: + Assert.fail("This should not happen!!!!!"); + } + MultiOutputFormat.write(tableNames[i], null, record, context); + i++; + } + } +} Index: src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (revision 0) @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestMultiOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(TestMultiOutputFormat.class); + private static File workDir; + private static Configuration mrConf = null; + private static FileSystem fs = null; + private static MiniMRCluster mrCluster = null; + + @BeforeClass + public static void setup() throws IOException { + createWorkDir(); + Configuration conf = new Configuration(true); + fs = FileSystem.get(conf); + System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); + // LocalJobRunner does not work with mapreduce OutputCommitter. So need + // to use MiniMRCluster. MAPREDUCE-2350 + mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null, + new JobConf(conf)); + mrConf = mrCluster.createJobConf(); + } + + private static void createWorkDir() throws IOException { + String testDir = System.getProperty("test.data.dir", "./"); + testDir = testDir + "/test_multiout_" + Math.abs(new Random().nextLong()) + "/"; + workDir = new File(new File(testDir).getCanonicalPath()); + FileUtil.fullyDelete(workDir); + workDir.mkdirs(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (mrCluster != null) { + mrCluster.shutdown(); + } + FileUtil.fullyDelete(workDir); + } + + /** + * A test job that reads a input file and outputs each word and the index of + * the word encountered to a text file and sequence file with different key + * values. + */ + @Test + public void testMultiOutputFormatWithoutReduce() throws Throwable { + Job job = new Job(mrConf, "MultiOutNoReduce"); + job.setMapperClass(MultiOutWordIndexMapper.class); + job.setJarByClass(this.getClass()); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(MultiOutputFormat.class); + job.setNumReduceTasks(0); + + JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); + configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class); + configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class, + IntWritable.class); + Path outDir = new Path(workDir.getPath(), job.getJobName()); + FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1")); + FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2")); + + String fileContent = "Hello World"; + String inputFile = createInputFile(fileContent); + FileInputFormat.setInputPaths(job, new Path(inputFile)); + + configurer.configure(); + Assert.assertTrue(job.waitForCompletion(true)); + + Path textOutPath = new Path(outDir, "out1/part-m-00000"); + String[] textOutput = readFully(textOutPath).split("\n"); + Path seqOutPath = new Path(outDir, "out2/part-m-00000"); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf); + Text key = new Text(); + IntWritable value = new IntWritable(); + String[] words = fileContent.split(" "); + Assert.assertEquals(words.length, textOutput.length); + LOG.info("Verifying file contents"); + for (int i = 0; i < words.length; i++) { + Assert.assertEquals((i + 1) + "\t" + words[i], textOutput[i]); + reader.next(key, value); + Assert.assertEquals(words[i], key.toString()); + Assert.assertEquals((i + 1), value.get()); + } + Assert.assertFalse(reader.next(key, value)); + } + + /** + * A word count test job that reads a input file and outputs the count of + * words to a text file and sequence file with different key values. + */ + @Test + public void testMultiOutputFormatWithReduce() throws Throwable { + Job job = new Job(mrConf, "MultiOutWithReduce"); + + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(MultiOutWordCountReducer.class); + job.setJarByClass(this.getClass()); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(MultiOutputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + + JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); + + configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class); + configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class, + IntWritable.class); + Path outDir = new Path(workDir.getPath(), job.getJobName()); + FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1")); + FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2")); + + configurer.configure(); + + String fileContent = "Hello World Hello World World"; + String inputFile = createInputFile(fileContent); + FileInputFormat.setInputPaths(job, new Path(inputFile)); + + Assert.assertTrue(job.waitForCompletion(true)); + + Path textOutPath = new Path(outDir, "out1/part-r-00000"); + String[] textOutput = readFully(textOutPath).split("\n"); + Path seqOutPath = new Path(outDir, "out2/part-r-00000"); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf); + Text key = new Text(); + IntWritable value = new IntWritable(); + String[] words = "Hello World".split(" "); + Assert.assertEquals(words.length, textOutput.length); + for (int i = 0; i < words.length; i++) { + Assert.assertEquals((i + 2) + "\t" + words[i], textOutput[i]); + reader.next(key, value); + Assert.assertEquals(words[i], key.toString()); + Assert.assertEquals((i + 2), value.get()); + } + Assert.assertFalse(reader.next(key, value)); + + } + + + /** + * Create a file for map input + * + * @return absolute path of the file. + * @throws IOException if any error encountered + */ + private String createInputFile(String content) throws IOException { + File f = File.createTempFile("input", "txt"); + FileWriter writer = new FileWriter(f); + writer.write(content); + writer.close(); + return f.getAbsolutePath(); + } + + private String readFully(Path file) throws IOException { + FSDataInputStream in = fs.open(file); + byte[] b = new byte[in.available()]; + in.readFully(b); + in.close(); + return new String(b); + } + + private static class MultiOutWordIndexMapper extends + Mapper { + + private IntWritable index = new IntWritable(1); + private Text word = new Text(); + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + MultiOutputFormat.write("out1", index, word, context); + MultiOutputFormat.write("out2", word, index, context); + index.set(index.get() + 1); + } + } + } + + private static class WordCountMapper extends + Mapper { + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + + private static class MultiOutWordCountReducer extends + Reducer { + + private IntWritable result = new IntWritable(); + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + MultiOutputFormat.write("out1", result, key, context); + MultiOutputFormat.write("out2", key, result, context); + } + } + +} Index: src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (revision 0) @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * The MultiOutputFormat class simplifies writing output data to multiple + * outputs. + *

+ * Multiple output formats can be defined each with its own + * OutputFormat class, own key class and own value class. Any + * configuration on these output format classes can be done without interfering + * with other output format's configuration. + *

+ * Usage pattern for job submission: + * + *

+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ *
+ * job.setMapperClass(WordCountMap.class);
+ * job.setReducerClass(WordCountReduce.class);
+ * job.setInputFormatClass(TextInputFormat.class);
+ * job.setOutputFormatClass(MultiOutputFormat.class);
+ * // Need not define OutputKeyClass and OutputValueClass. They default to
+ * // Writable.class
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(IntWritable.class);
+ *
+ *
+ * // Create a JobConfigurer that will configure the job with the multiple
+ * // output format information.
+ * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ *
+ * // Defines additional single text based output 'text' for the job.
+ * // Any configuration for the defined OutputFormat should be done with
+ * // the Job obtained with configurer.getJob() method.
+ * configurer.addOutputFormat("text", TextOutputFormat.class,
+ *                 IntWritable.class, Text.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
+ *                 Text.class, IntWritable.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
+ * ...
+ * // configure method to be called on the JobConfigurer once all the
+ * // output formats have been defined and configured.
+ * configurer.configure();
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * 
+ *

+ * Usage in Reducer: + * + *

+ * public class WordCountReduce extends
+ *         Reducer<Text, IntWritable, Writable, Writable> {
+ *
+ *     private IntWritable count = new IntWritable();
+ *
+ *     public void reduce(Text word, Iterator<IntWritable> values,
+ *             Context context)
+ *             throws IOException {
+ *         int sum = 0;
+ *         for (IntWritable val : values) {
+ *             sum += val.get();
+ *         }
+ *         count.set(sum);
+ *         MultiOutputFormat.write("text", count, word, context);
+ *         MultiOutputFormat.write("sequence", word, count, context);
+ *     }
+ *
+ * }
+ *
+ * 
+ * + * Map only jobs: + *

+ * MultiOutputFormat.write("output", key, value, context); can be called similar + * to a reducer in map only jobs. + * + */ +public class MultiOutputFormat extends OutputFormat { + + private static final String MO_ALIASES = "mapreduce.multiout.aliases"; + private static final String MO_ALIAS = "mapreduce.multiout.alias"; + private static final String CONF_KEY_DELIM = "%%"; + private static final String CONF_VALUE_DELIM = ";;"; + private static final String COMMA_DELIM = ","; + private static final List configsToOverride = new ArrayList(); + private static final List configsToMerge = new ArrayList(); + + static { + configsToOverride.add("mapred.output.dir"); + configsToMerge.add(JobContext.JOB_NAMENODES); + configsToMerge.add("tmpfiles"); + configsToMerge.add("tmpjars"); + configsToMerge.add("tmparchives"); + } + + /** + * Get a JobConfigurer instance that will support configuration of the job + * for multiple output formats. + * + * @param job the mapreduce job to be submitted + * @return + */ + public static JobConfigurer createConfigurer(Job job) { + return JobConfigurer.create(job); + } + + /** + * Write the output key and value using the OutputFormat defined by the + * alias. + * + * @param alias the name given to the OutputFormat configuration + * @param key the output key to be written + * @param value the output value to be written + * @param context the Mapper or Reducer Context + * @throws IOException + * @throws InterruptedException + */ + public static void write(String alias, K key, V value, TaskInputOutputContext context) + throws IOException, InterruptedException { + KeyValue keyval = new KeyValue(key, value); + context.write(new Text(alias), keyval); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + for (String alias : getOutputFormatAliases(context)) { + JobContext aliasContext = getJobContext(alias, context); + OutputFormat outputFormat = getOutputFormatInstance(aliasContext); + outputFormat.checkOutputSpecs(aliasContext); + // Copy credentials and any new config added back to JobContext + context.getCredentials().addAll(aliasContext.getCredentials()); + setAliasConf(alias, context, aliasContext); + } + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, + InterruptedException { + return new MultiRecordWriter(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new MultiOutputCommitter(context); + } + + private static OutputFormat getOutputFormatInstance(JobContext context) { + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(), + context.getConfiguration()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + return outputFormat; + } + + private static String[] getOutputFormatAliases(JobContext context) { + return context.getConfiguration().getStrings(MO_ALIASES); + } + + /** + * Compare the aliasContext with userJob and add the differing configuration + * as mapreduce.multiout.alias..conf to the userJob. + *

+ * Merge config like tmpjars, tmpfile, tmparchives, + * mapreduce.job.hdfs-servers that are directly handled by JobClient and add + * them to userJob. + *

+ * Add mapred.output.dir config to userJob. + * + * @param alias alias name associated with a OutputFormat + * @param userJob reference to Job that the user is going to submit + * @param aliasContext JobContext populated with OutputFormat related + * configuration. + */ + private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) { + Configuration userConf = userJob.getConfiguration(); + StringBuilder builder = new StringBuilder(); + for (Entry conf : aliasContext.getConfiguration()) { + String key = conf.getKey(); + String value = conf.getValue(); + String jobValue = userConf.getRaw(key); + if (jobValue == null || !jobValue.equals(value)) { + if (configsToMerge.contains(key)) { + String mergedValue = getMergedConfValue(jobValue, value); + userConf.set(key, mergedValue); + } else { + if (configsToOverride.contains(key)) { + userConf.set(key, value); + } + builder.append(key).append(CONF_KEY_DELIM).append(value) + .append(CONF_VALUE_DELIM); + } + } + } + builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length()); + userConf.set(getAliasConfName(alias), builder.toString()); + } + + private static String getMergedConfValue(String originalValues, String newValues) { + if (originalValues == null) { + return newValues; + } + Set mergedValues = new HashSet(); + mergedValues.addAll(StringUtils.getStringCollection(originalValues)); + mergedValues.addAll(StringUtils.getStringCollection(newValues)); + StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2); + for (String value : mergedValues) { + builder.append(value).append(COMMA_DELIM); + } + return builder.substring(0, builder.length() - COMMA_DELIM.length()); + } + + private static JobContext getJobContext(String alias, JobContext context) { + String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); + JobContext aliasContext = new JobContext(context.getConfiguration(), context.getJobID()); + addToConfig(aliasConf, aliasContext.getConfiguration()); + return aliasContext; + } + + private static TaskAttemptContext getTaskContext(String alias, TaskAttemptContext context) { + String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); + TaskAttemptContext aliasContext = new TaskAttemptContext(context.getConfiguration(), + context.getTaskAttemptID()); + addToConfig(aliasConf, aliasContext.getConfiguration()); + return aliasContext; + } + + private static String getAliasConfName(String alias) { + return MO_ALIAS + "." + alias + ".conf"; + } + + private static void addToConfig(String aliasConf, Configuration conf) { + String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM); + for (int i = 0; i < config.length; i += 2) { + conf.set(config[i], config[i + 1]); + } + } + + /** + * Class that supports configuration of the job for multiple output formats. + */ + public static class JobConfigurer { + + private final Job job; + private Map outputConfigs = new LinkedHashMap(); + + private JobConfigurer(Job job) { + this.job = job; + } + + private static JobConfigurer create(Job job) { + JobConfigurer configurer = new JobConfigurer(job); + return configurer; + } + + /** + * Add a OutputFormat configuration to the Job with a alias name. + * + * @param alias the name to be given to the OutputFormat configuration + * @param outputFormatClass OutputFormat class + * @param keyClass the key class for the output data + * @param valueClass the value class for the output data + * @throws IOException + */ + public void addOutputFormat(String alias, + Class outputFormatClass, + Class keyClass, Class valueClass) throws IOException { + Job copy = new Job(this.job.getConfiguration()); + outputConfigs.put(alias, copy); + copy.setOutputFormatClass(outputFormatClass); + copy.setOutputKeyClass(keyClass); + copy.setOutputValueClass(valueClass); + } + + /** + * Get the Job configuration for a OutputFormat defined by the alias + * name. The job returned by this method should be passed to the + * OutputFormat for any configuration instead of the Job that will be + * submitted to the JobClient. + * + * @param alias the name used for the OutputFormat during + * addOutputFormat + * @return + */ + public Job getJob(String alias) { + Job copy = outputConfigs.get(alias); + if (copy == null) { + throw new IllegalArgumentException("OutputFormat with alias " + alias + + " has not beed added"); + } + return copy; + } + + /** + * Configure the job with the multiple output formats added. This method + * should be called after all the output formats have been added and + * configured and before the job submission. + */ + public void configure() { + StringBuilder aliases = new StringBuilder(); + Configuration jobConf = job.getConfiguration(); + for (Entry entry : outputConfigs.entrySet()) { + // Copy credentials + job.getCredentials().addAll(entry.getValue().getCredentials()); + String alias = entry.getKey(); + aliases.append(alias).append(COMMA_DELIM); + // Store the differing configuration for each alias in the job + // as a setting. + setAliasConf(alias, job, entry.getValue()); + } + aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length()); + jobConf.set(MO_ALIASES, aliases.toString()); + } + + } + + private static class KeyValue implements Writable { + private final K key; + private final V value; + + public KeyValue(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + @Override + public void write(DataOutput out) throws IOException { + // Ignore. Not required as this will be never + // serialized/deserialized. + } + + @Override + public void readFields(DataInput in) throws IOException { + // Ignore. Not required as this will be never + // serialized/deserialized. + } + } + + private static class MultiRecordWriter extends RecordWriter { + + private final Map baseRecordWriters; + + public MultiRecordWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + baseRecordWriters = new LinkedHashMap(); + String[] aliases = getOutputFormatAliases(context); + for (String alias : aliases) { + TaskAttemptContext aliasContext = getTaskContext(alias, context); + Configuration aliasConf = aliasContext.getConfiguration(); + // Create output directory if not already created. + String outDir = aliasConf.get("mapred.output.dir"); + if (outDir != null) { + Path outputDir = new Path(outDir); + FileSystem fs = outputDir.getFileSystem(aliasConf); + if (!fs.exists(outputDir)) { + fs.mkdirs(outputDir); + } + } + OutputFormat outputFormat = getOutputFormatInstance(aliasContext); + baseRecordWriters.put(alias, + new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext), + aliasContext)); + } + } + + @Override + public void write(Writable key, Writable value) throws IOException, InterruptedException { + Text _key = (Text) key; + KeyValue _value = (KeyValue) value; + String alias = new String(_key.getBytes(), 0, _key.getLength()); + BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias); + if (baseRWContainer == null) { + throw new IllegalArgumentException("OutputFormat with alias " + alias + + " has not been added"); + } + baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue()); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) { + baseRWContainer.getRecordWriter().close(baseRWContainer.getContext()); + } + } + + } + + private static class BaseRecordWriterContainer { + + private final RecordWriter recordWriter; + private final TaskAttemptContext context; + + public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) { + this.recordWriter = recordWriter; + this.context = context; + } + + public RecordWriter getRecordWriter() { + return recordWriter; + } + + public TaskAttemptContext getContext() { + return context; + } + } + + private class MultiOutputCommitter extends OutputCommitter { + + private final Map outputCommitters; + + public MultiOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + outputCommitters = new LinkedHashMap(); + String[] aliases = getOutputFormatAliases(context); + for (String alias : aliases) { + TaskAttemptContext aliasContext = getTaskContext(alias, context); + OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext) + .getOutputCommitter(aliasContext); + outputCommitters.put(alias, + new BaseOutputCommitterContainer(baseCommitter, aliasContext)); + } + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().setupJob(outputContainer.getContext()); + } + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().setupTask(outputContainer.getContext()); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + boolean needTaskCommit = false; + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + needTaskCommit = needTaskCommit + || outputContainer.getBaseCommitter().needsTaskCommit( + outputContainer.getContext()); + } + return needTaskCommit; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().commitTask(outputContainer.getContext()); + } + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().abortTask(outputContainer.getContext()); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().commitJob(outputContainer.getContext()); + } + } + + @Override + public void abortJob(JobContext jobContext, State state) throws IOException { + for (String alias : outputCommitters.keySet()) { + BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); + outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state); + } + } + } + + private static class BaseOutputCommitterContainer { + + private final OutputCommitter outputCommitter; + private final TaskAttemptContext context; + + public BaseOutputCommitterContainer(OutputCommitter outputCommitter, + TaskAttemptContext context) { + this.outputCommitter = outputCommitter; + this.context = context; + } + + public OutputCommitter getBaseCommitter() { + return outputCommitter; + } + + public TaskAttemptContext getContext() { + return context; + } + } + +} Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1329472) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -48,6 +48,8 @@ import org.apache.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -64,6 +66,7 @@ */ class FileOutputCommitterContainer extends OutputCommitterContainer { + private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class); private final boolean dynamicPartitioningUsed; private boolean partitionsDiscovered; @@ -436,20 +439,18 @@ partition.setParameters(params); - // Sets permissions and group name on partition dirs. + // Sets permissions and group name on partition dirs and files. Path partPath = new Path(partLocnRoot); - for(FieldSchema partKey : table.getPartitionKeys()){ + int i = 0; + for (FieldSchema partKey : table.getPartitionKeys()) { + if (i++ != 0) { + applyGroupAndPerms(fs, partPath, perms, grpName, false); + } partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); -// LOG.info("Setting perms for "+partPath.toString()); - fs.setPermission(partPath, perms); - try{ - fs.setOwner(partPath, null, grpName); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in Hcatalog. -// LOG.warn(ace); - } } + // Apply the group and permissions to the leaf partition and files. + applyGroupAndPerms(fs, partPath, perms, grpName, true); if (dynamicPartitioningUsed){ String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs); if (harProcessor.isEnabled()){ @@ -466,8 +467,31 @@ return partition; } + private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission, + String group, boolean recursive) + throws IOException { + fs.setPermission(dir, permission); + try { + fs.setOwner(dir, null, group); + } catch (AccessControlException ace) { + LOG.warn("Error changing group of " + dir, ace); + } + if (recursive) { + for (FileStatus fileStatus : fs.listStatus(dir)) { + if (fileStatus.isDir()) { + applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive); + } else { + fs.setPermission(fileStatus.getPath(), permission); + try { + fs.setOwner(dir, null, group); + } catch (AccessControlException ace) { + LOG.warn("Error changing group of " + dir, ace); + } + } + } + } + } - private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA