Index: src/test/org/apache/hcatalog/mapreduce/TestMultiTableHCatOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestMultiTableHCatOutputFormat.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestMultiTableHCatOutputFormat.java (revision 0) @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestMultiTableHCatOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(TestMultiTableHCatOutputFormat.class); + + private static final String DATABASE = "default"; + private static final String[] tableNames = {"test1", "test2", "test3"}; + private static final String[] tablePerms = {"755", "750", "700"}; + private static Path warehousedir = null; + private static HashMap schemaMap = Maps.newHashMap(); + private HiveMetaStoreClient hmsc; + private MiniCluster cluster; + private Configuration clusterConf = new Configuration(true); + private HiveConf hiveConf; + private File workDir; + + private static final String msPort = "20199"; + private Thread t; + + static { + schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols)); + schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols)); + schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols)); + } + + private static class RunMS implements Runnable { + + @Override + public void run() { + try { + String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "=" + + warehousedir.toString(); + HiveMetaStore.main(new String[] {"-v", "-p", msPort, "--hiveconf", warehouseConf}); + } catch (Throwable t) { + System.err.println("Exiting. Got exception from metastore: " + t.getMessage()); + } + } + + } + + /** + * Private class which holds all the data for the test cases + */ + private static class ColumnHolder { + + private static ArrayList hCattest1Cols = Lists.newArrayList(); + private static ArrayList hCattest2Cols = Lists.newArrayList(); + private static ArrayList hCattest3Cols = Lists.newArrayList(); + + private static ArrayList partitionCols = Lists.newArrayList(); + private static ArrayList test1Cols = Lists.newArrayList(); + private static ArrayList test2Cols = Lists.newArrayList(); + private static ArrayList test3Cols = Lists.newArrayList(); + + private static HashMap> colMapping = Maps.newHashMap(); + + static { + try { + FieldSchema keyCol = new FieldSchema("key", Constants.STRING_TYPE_NAME, ""); + test1Cols.add(keyCol); + test2Cols.add(keyCol); + test3Cols.add(keyCol); + hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol)); + FieldSchema valueCol = new FieldSchema("value", Constants.STRING_TYPE_NAME, ""); + test1Cols.add(valueCol); + test3Cols.add(valueCol); + hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol)); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol)); + FieldSchema extraCol = new FieldSchema("extra", Constants.STRING_TYPE_NAME, ""); + test3Cols.add(extraCol); + hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol)); + colMapping.put("test1", test1Cols); + colMapping.put("test2", test2Cols); + colMapping.put("test3", test3Cols); + } catch (HCatException e) { + LOG.error("Error in setting up schema fields for the table", e); + throw new RuntimeException(e); + } + } + + static { + partitionCols.add(new FieldSchema("ds", Constants.STRING_TYPE_NAME, "")); + partitionCols.add(new FieldSchema("cluster", Constants.STRING_TYPE_NAME, "")); + } + } + + @Before + public void setup() throws Exception { + String testDir = System.getProperty("test.data.dir", "./"); + testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/"; + workDir = new File(new File(testDir).getCanonicalPath()); + FileUtil.fullyDelete(workDir); + workDir.mkdirs(); + + warehousedir = new Path(workDir + "/warehouse"); + + //Run hive metastore server + t = new Thread(new RunMS()); + t.start(); + + //LocalJobRunner does not work with mapreduce OutputCommitter. So need + //to use MiniMRCluster MAPREDUCE-2350 + cluster = MiniCluster.buildCluster(); + for (Entry entry : cluster.getProperties().entrySet()) { + if(entry.getKey().equals("fs.default.name")) { + //Using local file system instead of dfs + continue; + } + clusterConf.set((String) entry.getKey(), (String) entry.getValue()); + } + cluster.getFileSystem().mkdirs(warehousedir); + + initializeSetup(); + } + + private void initializeSetup() throws Exception { + + hiveConf = new HiveConf(this.getClass()); + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + + hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString()); + try { + hmsc = new HiveMetaStoreClient(hiveConf, null); + initalizeTables(); + } catch (Throwable e) { + LOG.error("Exception encountered while setting up testcase", e); + throw new Exception(e); + } finally { + hmsc.close(); + } + } + + private void initalizeTables() throws Exception { + for (String table : tableNames) { + try { + if (hmsc.getTable(DATABASE, table) != null) { + hmsc.dropTable(DATABASE, table); + } + } catch (NoSuchObjectException ignored) { + } + } + for (int i=0; i < tableNames.length; i++) { + createTable(tableNames[i], tablePerms[i]); + } + } + + private void createTable(String tableName, String tablePerm) throws Exception { + Table tbl = new Table(); + tbl.setDbName(DATABASE); + tbl.setTableName(tableName); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(ColumnHolder.colMapping.get(tableName)); + tbl.setSd(sd); + sd.setParameters(new HashMap()); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); + sd.getSerdeInfo().getParameters().put( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + tbl.setPartitionKeys(ColumnHolder.partitionCols); + + hmsc.createTable(tbl); + FileSystem fs = FileSystem.get(clusterConf); + fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm)); + } + + @After + public void cleanup() throws IOException { + FileUtil.fullyDelete(workDir); + FileSystem fs = FileSystem.get(clusterConf); + if (fs.exists(warehousedir)) { + fs.delete(warehousedir, true); + } + cluster.shutDown(); + } + + /** + * Simple test case. + *
    + *
  1. Submits a mapred job which writes out one fixed line to each of the tables
  2. + *
  3. uses hive fetch task to read the data and see if it matches what was written
  4. + *
+ * + * @throws Exception if any error occurs + */ + @Test + public void testOutputFormat() throws Exception { + HashMap partitionValues = Maps.newHashMap(); + partitionValues.put("ds", "1"); + partitionValues.put("cluster", "ag"); + OutputJobInfo info1 = OutputJobInfo.create("default", tableNames[0], partitionValues); + OutputJobInfo info2 = OutputJobInfo.create("default", tableNames[1], partitionValues); + OutputJobInfo info3 = OutputJobInfo.create("default", tableNames[2], partitionValues); + Job job = new Job(clusterConf, "SampleJob"); + //Trying to retain reference to local metastore file + //without creating a hive-site.xml in classpath. + job.getConfiguration().set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + + ArrayList infoList = new ArrayList(); + infoList.add(info1); + infoList.add(info2); + infoList.add(info3); + MultiTableHCatOutputFormat.setOutput(job, infoList); + LOG.debug("Filesystem is {}", FileSystem.get(clusterConf).getUri()); + for (String table : tableNames) { + Assert.assertNotNull( + "The table information of the " + table + " " + + "is found to be null", + job.getConfiguration().get( + MultiTableHCatOutputFormat.getTableBasedJobInfoProperty(table))); + } + Path filePath = createInputFile(); + for (String table : tableNames) { + MultiTableHCatOutputFormat.setSchema(job, schemaMap.get(table), table); + } + job.setMapperClass(MyMapper.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(MultiTableHCatOutputFormat.class); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(TableBasedHCatRecord.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, filePath); + Assert.assertTrue(job.waitForCompletion(true)); + + ArrayList outputs = Lists.newArrayList(); + for (String tbl : tableNames) { + outputs.add(getTableData(tbl, "default").get(0)); + } + Assert.assertEquals("Comparing output of table " + + tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag"); + Assert.assertEquals("Comparing output of table " + + tableNames[1] + " is not correct", outputs.get(1), "a,1,ag"); + Assert.assertEquals("Comparing output of table " + + tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag"); + + //Check permisssion on partition dirs and files created + for (int i=0; i < tableNames.length; i++) { + Path partitionFile = new Path(warehousedir + "/" + tableNames[i] + + "/ds=1/cluster=ag/part-m-00000"); + FileSystem fs = partitionFile.getFileSystem(clusterConf); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile).getPermission(), + new FsPermission(tablePerms[i])); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile.getParent()).getPermission(), + new FsPermission(tablePerms[i])); + Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct", + fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(), + new FsPermission(tablePerms[i])); + + } + } + + /** + * Create a dummy file for map input + * + * @return absolute path of the file. + * @throws IOException if any error encountered + */ + private Path createInputFile() throws IOException { + Path f = new Path(workDir + "/MultiTableInput.txt"); + FileSystem fs = FileSystem.get(clusterConf); + if (fs.exists(f)) { + fs.delete(f, true); + } + OutputStream out = fs.create(f, true, 1024, (short)1, 5000L); + for (int i = 0; i < 3; i++) { + out.write("a,a\n".getBytes()); + } + out.close(); + return f; + } + + /** + * Method to fetch table data + * + * @param table table name + * @param database database + * @return list of columns in comma seperated way + * @throws Exception if any error occurs + */ + private List getTableData(String table, + String database + ) throws Exception { + HiveConf conf = new HiveConf(); + conf.addResource("hive-site.xml"); + ArrayList results = Lists.newArrayList(); + ArrayList temp = Lists.newArrayList(); + Hive hive = Hive.get(conf); + org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table); + FetchWork work; + if (!tbl.getPartCols().isEmpty()) { + List partitions = hive.getPartitions(tbl); + List partDesc = Lists.newArrayList(); + List partLocs = new ArrayList(); + for (Partition part : partitions) { + partLocs.add(part.getLocation()); + partDesc.add(Utilities.getPartitionDesc(part)); + } + work = new FetchWork(partLocs, partDesc); + work.setLimit(100); + } else { + work = new FetchWork(tbl.getDataLocation().toString(), + Utilities.getTableDesc(tbl)); + } + FetchTask task = new FetchTask(); + task.setWork(work); + task.initialize(conf, null, null); + task.fetch(temp); + for (String str : temp) { + results.add(str.replace("\t", ",")); + } + return results; + } + + public static class MyMapper extends + Mapper { + private int i = 0; + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + TableBasedHCatRecord record = null; + String[] splits = value.toString().split(","); + switch (i) { + case 0: + record = new TableBasedHCatRecord(tableNames[0], 2); + record.set(0, splits[0]); + record.set(1, splits[1]); + break; + case 1: + record = new TableBasedHCatRecord(tableNames[1], 1); + record.set(0, splits[0]); + break; + case 2: + record = new TableBasedHCatRecord(tableNames[2], 3); + record.set(0, splits[0]); + record.set(1, splits[1]); + record.set(2, "extra"); + break; + default: + Assert.fail("This should never happen!!!!!"); + } + i++; + if (LOG.isDebugEnabled()) { + LOG.debug("I am writing to {} data is {}", record.getTableName(), + record.getAll()); + } + context.write(null, record); + } + } +} Index: src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputCommitter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputCommitter.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputCommitter.java (revision 0) @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.data.HCatRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MultiTableHCatOutputCommitter extends OutputCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + MultiTableHCatOutputCommitter.class); + + private final Map tableCommitters; + + public MultiTableHCatOutputCommitter(TaskAttemptContext taskContext) throws IOException, + InterruptedException { + this.tableCommitters = new HashMap(); + String[] tables = taskContext.getConfiguration().getStrings( + MultiTableHCatOutputFormat.HCAT_TABLE_LIST); + for (String table : tables) { + OutputJobInfo info = MultiTableHCatOutputFormat.getJobInfo(taskContext, table); + TaskAttemptContext clone = MultiTableHCatOutputFormat.cloneTaskContextForTable(table, + taskContext); + OutputFormat, HCatRecord> outputFormat = MultiTableHCatOutputFormat + .getOutputFormat(info, clone); + OutputCommitter baseCommitter = outputFormat.getOutputCommitter(clone); + this.tableCommitters.put(table, new OutputCommitterContainer(baseCommitter, clone)); + } + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().setupTask(outputContainer.getTaskContext()); + } + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + boolean needTaskCommit = false; + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + OutputCommitterContainer outputContainer = tableCommitters.get(table); + needTaskCommit = needTaskCommit || + outputContainer.getBaseCommitter().needsTaskCommit( + outputContainer.getTaskContext()); + } + } + return needTaskCommit; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().commitTask(outputContainer.getTaskContext()); + } + } + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().abortTask(outputContainer.getTaskContext()); + } + } + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().setupJob(outputContainer.getJobContext()); + } + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + LOG.info("Committing job for tables " + + jobContext.getConfiguration().get(MultiTableHCatOutputFormat.HCAT_TABLE_LIST)); + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + LOG.info("Committing job for table " + table); + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().commitJob(outputContainer.getJobContext()); + } + } + cleanupJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, State state) throws IOException { + LOG.info("Aborting job for tables " + + jobContext.getConfiguration().get(MultiTableHCatOutputFormat.HCAT_TABLE_LIST)); + if (tableCommitters != null) { + for (String table : tableCommitters.keySet()) { + LOG.info("Aborting job for table " + table); + OutputCommitterContainer outputContainer = tableCommitters.get(table); + outputContainer.getBaseCommitter().abortJob(outputContainer.getJobContext(), state); + } + } + cleanupJob(jobContext); + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + // TODO: Optimization. Get DT only once and cancel only once. + // Currently one DT issued per table. + } + + private static class OutputCommitterContainer { + + private final OutputCommitter baseCommitter; + private final TaskAttemptContext taskContext; + private final JobContext jobContext; + + public OutputCommitterContainer(OutputCommitter baseCommitter, + TaskAttemptContext taskContext) { + this.baseCommitter = baseCommitter; + this.taskContext = taskContext; + this.jobContext = new JobContext(taskContext.getConfiguration(), taskContext.getJobID()); + } + + public OutputCommitter getBaseCommitter() { + return baseCommitter; + } + + public TaskAttemptContext getTaskContext() { + return taskContext; + } + + public JobContext getJobContext() { + return jobContext; + } + + } + +} Index: src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (revision 1303489) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (working copy) @@ -113,11 +113,13 @@ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + client = HCatUtil.createHiveClient(hiveConf); handleDuplicatePublish(context, jobInfo, - HCatUtil.createHiveClient(hiveConf), + client, jobInfo.getTableInfo().getTable()); } catch (MetaException e) { throw new IOException(e); @@ -125,6 +127,8 @@ throw new IOException(e); } catch (NoSuchObjectException e) { throw new IOException(e); + } finally { + HCatUtil.closeHiveClientQuietly(client); } if(!jobInfo.isDynamicPartitioningUsed()) { Index: src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (revision 1303489) +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (working copy) @@ -19,10 +19,7 @@ package org.apache.hcatalog.mapreduce; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.io.RCFile; @@ -32,13 +29,11 @@ import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -95,13 +90,13 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, + public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { } @Override - public void configureOutputJobProperties(TableDesc tableDesc, + public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { try { OutputJobInfo jobInfo = (OutputJobInfo) @@ -114,7 +109,7 @@ // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job if (dynHash != null){ - parentPath = new Path(parentPath, + parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString(); } @@ -128,16 +123,13 @@ List cols = new ArrayList(); List values = new ArrayList(); - //sort the cols and vals - for(String name: + //Get the output location in the order partition keys are defined for the table. + for(String name: jobInfo.getTableInfo(). getPartitionColumns().getFieldNames()) { String value = jobInfo.getPartitionValues().get(name); - int i=0; - while(i 0) - i++; - cols.add(i,name); - values.add(i,value); + cols.add(name); + values.add(value); } outputLocation = FileUtils.makePartName(cols, values); } @@ -145,7 +137,7 @@ jobInfo.setLocation(new Path(parentPath,outputLocation).toString()); //only set output dir if partition is fully materialized - if(jobInfo.getPartitionValues().size() + if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) { jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } @@ -179,7 +171,7 @@ } @Override - public HiveAuthorizationProvider getAuthorizationProvider() + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { return new DefaultHiveAuthorizationProvider(); } Index: src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputFormat.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/MultiTableHCatOutputFormat.java (revision 0) @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; + +/** + * HCatOutputFormat which supports writing into multiple tables in HCatalog, + * based upon HCatOutputFormat which writes out to a single table. + */ + +public class MultiTableHCatOutputFormat extends HCatOutputFormat { + + public static final String HCAT_TABLE_LIST = "hcat.table.list"; + public static final String HCAT_TABLE_CONF = "hcat.table.conf"; + public static final String HCAT_TABLE_CONF_KEY_DELIM = "%"; + public static final String HCAT_TABLE_CONF_VALUE_DELIM = ";"; + + public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException { + throw new UnsupportedOperationException( + "Please use setOutput(Job job, List outputJobInfos) instead"); + } + + /** + * Sets up the output info for all the tables. + * + * @param job JobContext + * @param outputJobInfos list of table information to which outputformat should write to. + * @throws IOException + */ + public static void setOutput(Job job, List outputJobInfos) + throws IOException { + ArrayList tableList = new ArrayList(); + Configuration conf = job.getConfiguration(); + Configuration initial = new Configuration(job.getConfiguration()); + for (OutputJobInfo outputJobInfo : outputJobInfos) { + HCatOutputFormat.setOutput(job, outputJobInfo); + final String tableName = outputJobInfo.getTableName(); + conf.set(getTableBasedJobInfoProperty(tableName), + conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + conf.set(getTableBasedConf(tableName), + getDifferentialConf(initial, job.getConfiguration())); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, ""); + tableList.add(tableName); + } + conf.set(HCAT_TABLE_LIST, StringUtils.join(tableList, ",")); + } + + static String getTableBasedJobInfoProperty(String tableName) { + return HCatConstants.HCAT_KEY_OUTPUT_INFO + "." + tableName; + } + + static String getTableBasedConf(String tableName) { + return HCAT_TABLE_CONF + "." + tableName; + } + + static String getDifferentialConf(Configuration initial, Configuration modified) { + StringBuilder builder = new StringBuilder(); + for (Entry entry : modified) { + if (entry.getKey().startsWith(HCAT_TABLE_CONF)) + continue; + // We handle OutputJobInfo separately. + if (entry.getKey().startsWith(HCatConstants.HCAT_KEY_OUTPUT_INFO)) + continue; + + String value = initial.get(entry.getKey()); + if (value == null || !value.equals(entry.getValue())) { + //Avoid Variable substitution depth too large error by replacing ${ with $} + builder.append(entry.getKey()).append(HCAT_TABLE_CONF_KEY_DELIM) + .append(entry.getValue().replace("${", "$}")).append(HCAT_TABLE_CONF_VALUE_DELIM); + } + } + // Remove the last extra delimiter + if (builder.length() != 0) + builder.deleteCharAt(builder.length() - 1); + return builder.toString(); + } + + static void addTableConf(Configuration conf, String tableConf) { + String[] config = tableConf.split("[" + HCAT_TABLE_CONF_KEY_DELIM + + HCAT_TABLE_CONF_VALUE_DELIM + "]"); + + for (int i = 0; i < config.length; i += 2) { + //Correct the variables before adding to conf + conf.set(config[i], config[i + 1].replace("$}", "${")); + } + } + + @Override + protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) + throws IOException { + return super.getOutputFormat(context); + } + + /** {@inheritDoc} */ + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + String[] tables = conf.getStrings(HCAT_TABLE_LIST); + for (String table : tables) { + OutputJobInfo info = getJobInfo(context, table); + JobContext clone = cloneJobContextForTable(table, context); + OutputFormat, HCatRecord> outputFormat = getOutputFormat(info, + context); + outputFormat.checkOutputSpecs(clone); + } + } + + /** {@inheritDoc} */ + @Override + public RecordWriter, HCatRecord> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new TableBasedHCatRecordWriter(context); + } + + /** {@inheritDoc} */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new MultiTableHCatOutputCommitter(context); + } + + public static void setSchema(final Job job, final HCatSchema schema) throws IOException { + throw new UnsupportedOperationException("Please use setSchema(job, schema, tableName) instead"); + } + + /** + * Set the schema for the data being written out to the table partition. The + * table schema is used by default for the partition if this is not called. + * + * @param job job to be launched + * @param schema schema for the data being written + * @param tableName name of the hcatalog table + * @throws IOException + */ + public static void setSchema(final Job job, final HCatSchema schema, String tableName) + throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, + conf.get(getTableBasedJobInfoProperty(tableName))); + HCatOutputFormat.setSchema(job, schema); + conf.set(getTableBasedJobInfoProperty(tableName), + conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + } + + static OutputJobInfo getJobInfo(JobContext context, String tableName) throws IOException { + String jobString = context.getConfiguration().get(getTableBasedJobInfoProperty(tableName)); + if (jobString == null) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED); + } + return (OutputJobInfo) HCatUtil.deserialize(jobString); + } + + /** + * Gets the output format instance. + * + * @param jobInfo the output job info + * @param context the job context + * @return + * @throws IOException + */ + static OutputFormat, HCatRecord> getOutputFormat(OutputJobInfo jobInfo, + JobContext context) throws IOException { + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + jobInfo.getTableInfo().getStorerInfo()); + return storageHandler.getOutputFormatContainer(ReflectionUtils.newInstance( + storageHandler.getOutputFormatClass(), context.getConfiguration())); + } + + static TaskAttemptContext cloneTaskContextForTable(String table, TaskAttemptContext taskContext) { + Configuration conf = taskContext.getConfiguration(); + TaskAttemptContext clone = new TaskAttemptContext(conf, + taskContext.getTaskAttemptID()); + clone.getConfiguration().set( + HCatConstants.HCAT_KEY_OUTPUT_INFO, + conf.get(MultiTableHCatOutputFormat.getTableBasedJobInfoProperty(table))); + addTableConf(clone.getConfiguration(), conf.get(getTableBasedConf(table))); + return clone; + } + + static JobContext cloneJobContextForTable(String table, JobContext jobContext) { + Configuration conf = jobContext.getConfiguration(); + JobContext clone = new JobContext(conf, jobContext.getJobID()); + clone.getConfiguration().set( + HCatConstants.HCAT_KEY_OUTPUT_INFO, + conf.get(MultiTableHCatOutputFormat.getTableBasedJobInfoProperty(table))); + addTableConf(clone.getConfiguration(), conf.get(getTableBasedConf(table))); + return clone; + } +} Index: src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (revision 1303489) +++ src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (working copy) @@ -90,15 +90,18 @@ getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); //Cancel HCat and JobTracker tokens + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); - HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); + client = HCatUtil.createHiveClient(hiveConf); String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (Exception e) { LOG.warn("Failed to cancel delegation token", e); + } finally { + HCatUtil.closeHiveClientQuietly(client); } } } Index: src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecordWriter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecordWriter.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecordWriter.java (revision 0) @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.data.HCatRecord; + +/** + * A RecordWriter that writes to multiple hcatalog tables + * + */ +public class TableBasedHCatRecordWriter extends RecordWriter, HCatRecord> { + + private Map, HCatRecord>> baseRecordWriters; + + public TableBasedHCatRecordWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + baseRecordWriters = new HashMap, HCatRecord>>(); + String[] tables = conf.getStrings("hcat.table.list"); + for (String table : tables) { + OutputJobInfo outInfo = MultiTableHCatOutputFormat.getJobInfo(context, table); + TaskAttemptContext clone = MultiTableHCatOutputFormat.cloneTaskContextForTable(table, + context); + Path outputDir = new Path(clone.getConfiguration().get("mapred.output.dir")); + FileSystem fs = outputDir.getFileSystem(clone.getConfiguration()); + if (!fs.exists(outputDir)) { + fs.mkdirs(outputDir); + } + RecordWriter, HCatRecord> baseWriter = MultiTableHCatOutputFormat + .getOutputFormat(outInfo, clone).getRecordWriter(clone); + baseRecordWriters.put(table, baseWriter); + } + } + + @Override + public void write(WritableComparable key, HCatRecord value) throws IOException, + InterruptedException { + String table = ((TableBasedHCatRecord) value).getTableName(); + baseRecordWriters.get(table).write(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + for (String table : baseRecordWriters.keySet()) { + baseRecordWriters.get(table).close(context); + } + } +} Index: src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecord.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecord.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/TableBasedHCatRecord.java (revision 0) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import org.apache.hcatalog.data.DefaultHCatRecord; + +/** + * A HCatRecord implementation that associates a table with a record. + * + */ +public class TableBasedHCatRecord extends DefaultHCatRecord { + + private String tableName; + + public TableBasedHCatRecord(String tableName, int size) { + super(size); + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } +} Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1303489) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -48,10 +48,13 @@ import org.apache.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -64,6 +67,7 @@ */ class FileOutputCommitterContainer extends OutputCommitterContainer { + private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class); private final boolean dynamicPartitioningUsed; private boolean partitionsDiscovered; @@ -174,13 +178,14 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration()); - HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); + client = HCatUtil.createHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. - // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && jobContext.getConfiguration().get @@ -202,6 +207,8 @@ } else { throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); } + } finally { + HCatUtil.closeHiveClientQuietly(client); } Path src; @@ -426,9 +433,7 @@ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); } } finally { - if( client != null ) { - client.close(); - } + HCatUtil.closeHiveClientQuietly(client); } } @@ -486,19 +491,15 @@ partition.setParameters(params); - // Sets permissions and group name on partition dirs. + // Sets permissions and group name on partition dirs and files. Path partPath = new Path(partLocnRoot); + boolean topLevelDir = true; for(FieldSchema partKey : table.getPartitionKeys()){ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); -// LOG.info("Setting perms for "+partPath.toString()); - fs.setPermission(partPath, perms); - try{ - fs.setOwner(partPath, null, grpName); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in Hcatalog. -// LOG.warn(ace); - } + if (topLevelDir) + applyGroupAndPermsRecursively(fs, partPath, perms, grpName); + topLevelDir = false; } if (dynamicPartitioningUsed){ String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs); @@ -516,8 +517,29 @@ return partition; } + private void applyGroupAndPermsRecursively(FileSystem fs, Path dir, FsPermission permission, + String group) + throws IOException { + fs.setPermission(dir, permission); + try { + fs.setOwner(dir, null, group); + } catch (AccessControlException ace) { + LOG.warn("Error changing group of " + dir, ace); + } + for (FileStatus fileStatus : fs.listStatus(dir)) { + if (fileStatus.isDir()) { + applyGroupAndPermsRecursively(fs, fileStatus.getPath(), permission, group); + } else { + fs.setPermission(fileStatus.getPath(), permission); + try { + fs.setOwner(dir, null, group); + } catch (AccessControlException ace) { + LOG.warn("Error changing group of " + dir, ace); + } + } + } + } - private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1303489) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -622,6 +622,15 @@ throws MetaException { return new HiveMetaStoreClient(hiveConf); } + + public static void closeHiveClientQuietly(HiveMetaStoreClient client) { + try { + if (client != null) + client.close(); + } catch (Exception e) { + LOG.warn("Error closing metastore client", e); + } + } public static HiveConf getHiveConf(Configuration conf)