Index: src/test/org/apache/hcatalog/pig/TestHCatStorer.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatStorer.java (revision 1149353) +++ src/test/org/apache/hcatalog/pig/TestHCatStorer.java (working copy) @@ -585,4 +585,122 @@ assertFalse(itr.hasNext()); } + + + public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN", + "111238\tKalpana\t01/01/2000\tF\tIN\tKA", + "111239\tSatya\t01/01/2001\tM\tIN\tKL", + "111240\tKavya\t01/01/2002\tF\tIN\tAP"}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN');"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList results = new ArrayList(); + driver.getResults(results); + assertEquals(4, results.size()); + Collections.sort(results); + assertEquals(inputData[0], results.get(0)); + assertEquals(inputData[1], results.get(1)); + assertEquals(inputData[2], results.get(2)); + assertEquals(inputData[3], results.get(3)); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN", + "111238\tKalpana\t01/01/2000\tF\tIN\tKA", + "111239\tSatya\t01/01/2001\tM\tIN\tKL", + "111240\tKavya\t01/01/2002\tF\tIN\tAP"}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList results = new ArrayList(); + driver.getResults(results); + assertEquals(4, results.size()); + Collections.sort(results); + assertEquals(inputData[0], results.get(0)); + assertEquals(inputData[1], results.get(1)); + assertEquals(inputData[2], results.get(2)); + assertEquals(inputData[3], results.get(3)); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList results = new ArrayList(); + driver.getResults(results); + assertEquals(0, results.size()); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (working copy) @@ -80,17 +80,17 @@ Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value1"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); partitionMap.clear(); partitionMap.put("PART1", "p1value2"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); //Test for duplicate publish IOException exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -105,7 +105,7 @@ partitionMap.put("px", "p1value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -118,14 +118,15 @@ //Test for null partition value map exc = null; try { - runMRCreate(null, partitionColumns, writeRecords, 20); + runMRCreate(null, partitionColumns, writeRecords, 20,false); } catch(IOException e) { exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); + assertTrue(exc == null); +// assertTrue(exc instanceof HCatException); +// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values //Read should get 10 + 20 rows runMRRead(30); @@ -166,7 +167,7 @@ Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value5"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); tableSchema = getTableSchema(); @@ -187,7 +188,7 @@ IOException exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -217,7 +218,7 @@ exc = null; try { - runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20); + runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true); } catch(IOException e) { exc = e; } @@ -266,7 +267,7 @@ Exception exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); } catch(IOException e) { exc = e; } @@ -291,7 +292,7 @@ writeRecords.add(new DefaultHCatRecord(objList)); } - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); //Read should get 10 + 20 + 10 + 10 + 20 rows runMRRead(70); Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (working copy) @@ -243,7 +243,7 @@ void runMRCreate(Map partitionValues, List partitionColumns, List records, - int writeCount) throws Exception { + int writeCount, boolean assertWrite) throws Exception { writeRecords = records; MapCreate.writeCount = 0; @@ -275,8 +275,11 @@ //new HCatOutputCommitter(null).setupJob(job); job.waitForCompletion(true); - new HCatOutputCommitter(null).cleanupJob(job); - Assert.assertEquals(writeCount, MapCreate.writeCount); + new HCatOutputCommitter(job,null).cleanupJob(job); + if (assertWrite){ + // we assert only if we expected to assert with this call. + Assert.assertEquals(writeCount, MapCreate.writeCount); + } } List runMRRead(int readCount) throws Exception { Index: src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 0) @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +public class TestHCatDynamicPartitioned extends HCatMapReduceTest { + + private List writeRecords; + private List dataColumns; + + @Override + protected void initialize() throws Exception { + + tableName = "testHCatDynamicPartitionedTable"; + generateWriteRecords(20,5,0); + generateDataColumns(); + } + + private void generateDataColumns() throws HCatException { + dataColumns = new ArrayList(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""))); + } + + private void generateWriteRecords(int max, int mod,int offset) { + writeRecords = new ArrayList(); + + for(int i = 0;i < max;i++) { + List objList = new ArrayList(); + + objList.add(i); + objList.add("strvalue" + i); + objList.add(String.valueOf((i % mod)+offset)); + writeRecords.add(new DefaultHCatRecord(objList)); + } + } + + @Override + protected List getPartitionKeys() { + List fields = new ArrayList(); + fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + @Override + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + + public void testHCatDynamicPartitionedTable() throws Exception { + + generateWriteRecords(20,5,0); + runMRCreate(null, dataColumns, writeRecords, 20,true); + + runMRRead(20); + + //Read with partition filter + runMRRead(4, "p1 = \"0\""); + runMRRead(8, "p1 = \"1\" or p1 = \"3\""); + runMRRead(4, "p1 = \"4\""); + + // read from hive to test + + String query = "select * from " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if( retCode != 0 ) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(20, res.size()); + + + //Test for duplicate publish + IOException exc = null; + try { + generateWriteRecords(20,5,0); + runMRCreate(null, dataColumns, writeRecords, 20,false); + } catch(IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + } + + public void testHCatDynamicPartitionMaxPartitions() throws Exception { + HiveConf hc = new HiveConf(this.getClass()); + + int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + System.out.println("Max partitions allowed = " + maxParts); + + IOException exc = null; + try { + generateWriteRecords(maxParts+5,maxParts+2,10); + runMRCreate(null,dataColumns,writeRecords,maxParts+5,false); + } catch(IOException e) { + exc = e; + } + + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); + }else{ + assertTrue(exc == null); + runMRRead(maxParts+5); + } + } +} Index: src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (working copy) @@ -160,7 +160,7 @@ } public void publishTest(Job job) throws Exception { - OutputCommitter committer = new HCatOutputCommitter(null); + OutputCommitter committer = new HCatOutputCommitter(job,null); committer.cleanupJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (working copy) @@ -228,7 +228,7 @@ schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); } @@ -247,7 +247,7 @@ schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (working copy) @@ -109,7 +109,7 @@ schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); Path metadataPath = new Path(outputLocation, "_metadata"); @@ -165,7 +165,7 @@ schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); Path metadataPath = new Path(outputLocation, "_metadata"); Map.Entry> rv = EximUtil.readMetaData(fs, metadataPath); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (revision 1149353) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (working copy) @@ -78,12 +78,12 @@ public void testHCatNonPartitionedTable() throws Exception { Map partitionMap = new HashMap(); - runMRCreate(null, partitionColumns, writeRecords, 10); + runMRCreate(null, partitionColumns, writeRecords, 10,true); //Test for duplicate publish IOException exc = null; try { - runMRCreate(null, partitionColumns, writeRecords, 20); + runMRCreate(null, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -98,7 +98,7 @@ partitionMap.put("px", "p1value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } Index: src/java/org/apache/hcatalog/pig/HCatEximStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatEximStorer.java (revision 1149353) +++ src/java/org/apache/hcatalog/pig/HCatEximStorer.java (working copy) @@ -145,7 +145,7 @@ //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - new HCatEximOutputCommitter(null).cleanupJob(job); + new HCatEximOutputCommitter(job,null).cleanupJob(job); } } } Index: src/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1149353) +++ src/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -405,4 +406,16 @@ } + public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { + if(p.getProperty(propName) != null){ + config.set(propName, p.getProperty(propName)); + } + } + + public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) { + if(config.get(propName) != null){ + p.setProperty(propName, config.get(propName)); + } + } + } Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1149353) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -109,34 +109,35 @@ computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema); HCatOutputFormat.setSchema(job, computedSchema); p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - if(config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null){ - p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF)); - } - if(config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){ - p.setProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, - config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } + + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); }else{ config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - if(p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null){ - config.set(HCatConstants.HCAT_KEY_HIVE_CONF, p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF)); - } - if(p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){ - config.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, - p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } + + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + } } + @Override public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - new HCatOutputCommitter(null).cleanupJob(job); + new HCatOutputCommitter(job,null).cleanupJob(job); } } } Index: src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java =================================================================== --- src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (revision 1149353) +++ src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; Index: src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java (revision 1149353) +++ src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (working copy) @@ -22,15 +22,23 @@ import java.util.Map; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; @@ -40,6 +48,7 @@ */ public abstract class HCatOutputStorageDriver { + /** * Initialize the storage driver with specified properties, default implementation does nothing. * @param context the job context object @@ -103,13 +112,22 @@ * @param jobContext the job context object * @param tableLocation the location of the table * @param partitionValues the partition values + * @param dynHash A unique hash value that represents the dynamic partitioning job used * @return the location String. * @throws IOException Signals that an I/O exception has occurred. */ public String getOutputLocation(JobContext jobContext, - String tableLocation, List partitionCols, Map partitionValues) throws IOException { + String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { + + String parentPath = tableLocation; + // For dynamic partitioned writes without all keyvalues specified, + // we create a temp dir for the associated write job + if (dynHash != null){ + parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString(); + } - if( partitionValues == null || partitionValues.size() == 0 ) { + // For non-partitioned tables, we send them to the temp dir + if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) { return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString(); } @@ -120,7 +138,7 @@ String partitionLocation = FileUtils.makePartName(partitionCols, values); - Path path = new Path(tableLocation, partitionLocation); + Path path = new Path(parentPath, partitionLocation); return path.toString(); } @@ -130,4 +148,59 @@ public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{ return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part","")); } + + /** + * Implementation that calls the underlying output committer's setupJob, + * used in lieu of underlying committer's setupJob when using dynamic partitioning + * The default implementation should be overriden by underlying implementations + * that do not use FileOutputCommitter. + * The reason this function exists is so as to allow a storage driver implementor to + * override underlying OutputCommitter's setupJob implementation to allow for + * being called multiple times in a job, to make it idempotent. + * This should be written in a manner that is callable multiple times + * from individual tasks without stepping on each others' toes + * + * @param context + * @throws InterruptedException + * @throws IOException + */ + public void setupOutputCommitterJob(TaskAttemptContext context) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).setupJob(context); + } + + /** + * Implementation that calls the underlying output committer's cleanupJob, + * used in lieu of underlying committer's cleanupJob when using dynamic partitioning + * This should be written in a manner that is okay to call after having had + * multiple underlying outputcommitters write to task dirs inside it. + * While the base MR cleanupJob should have sufficed normally, this is provided + * in order to let people implementing setupOutputCommitterJob to cleanup properly + * + * @param context + * @throws IOException + */ + public void cleanupOutputCommitterJob(TaskAttemptContext context) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).cleanupJob(context); + } + + /** + * Implementation that calls the underlying output committer's abortJob, + * used in lieu of underlying committer's abortJob when using dynamic partitioning + * This should be written in a manner that is okay to call after having had + * multiple underlying outputcommitters write to task dirs inside it. + * While the base MR cleanupJob should have sufficed normally, this is provided + * in order to let people implementing setupOutputCommitterJob to abort properly + * + * @param context + * @param state + * @throws IOException + */ + public void abortOutputCommitterJob(TaskAttemptContext context, State state) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).abortJob(context,state); + } + + } Index: src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hcatalog.mapreduce; import java.io.Serializable; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -66,6 +67,10 @@ /** The partition values to publish to, if used for output*/ private Map partitionValues; + /** List of keys for which values were not specified at write setup time, to be infered at write time */ + private List dynamicPartitioningKeys; + + /** * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat} * for reading data from a table. @@ -229,6 +234,27 @@ return serverKerberosPrincipal; } + /** + * Returns whether or not Dynamic Partitioning is used + * @return whether or not dynamic partitioning is currently enabled and used + */ + public boolean isDynamicPartitioningUsed() { + return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty())); + } + + /** + * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys + * @param dynamicPartitioningKeys + */ + public void setDynamicPartitioningKeys(List dynamicPartitioningKeys) { + this.dynamicPartitioningKeys = dynamicPartitioningKeys; + } + + public List getDynamicPartitioningKeys(){ + return this.dynamicPartitioningKeys; + } + + @Override public int hashCode() { int result = 17; @@ -240,8 +266,9 @@ result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode()); result = 31*result + tableInfoType.ordinal(); result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode()); + result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode()); return result; + } - } } Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (working copy) @@ -20,9 +20,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -36,6 +41,8 @@ public abstract class HCatBaseOutputFormat extends OutputFormat, HCatRecord> { +// static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class); + /** * Gets the table schema for the table specified in the HCatOutputFormat.setOutput call * on the specified job context. @@ -83,7 +90,7 @@ * @return the OutputJobInfo object * @throws IOException the IO exception */ - static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { + public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); if( jobString == null ) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED); @@ -102,33 +109,107 @@ @SuppressWarnings("unchecked") static HCatOutputStorageDriver getOutputDriverInstance( JobContext jobContext, OutputJobInfo jobInfo) throws IOException { + return getOutputDriverInstance(jobContext,jobInfo,(List)null); + } + + /** + * Gets the output storage driver instance, with allowing specification of missing dynamic partvals + * @param jobContext the job context + * @param jobInfo the output job info + * @return the output driver instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + static HCatOutputStorageDriver getOutputDriverInstance( + JobContext jobContext, OutputJobInfo jobInfo, List dynamicPartVals) throws IOException { try { Class driverClass = (Class) Class.forName(jobInfo.getStorerInfo().getOutputSDClass()); HCatOutputStorageDriver driver = driverClass.newInstance(); + Map partitionValues = jobInfo.getTableInfo().getPartitionValues(); + String location = jobInfo.getLocation(); + + if (dynamicPartVals != null){ + // dynamic part vals specified + List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + if (dynamicPartVals.size() != dynamicPartKeys.size()){ + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Unable to instantiate dynamic partitioning storage driver, mismatch between" + + " number of partition values obtained["+dynamicPartVals.size() + + "] and number of partition values required["+dynamicPartKeys.size()+"]"); + } + for (int i = 0; i < dynamicPartKeys.size(); i++){ + partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i)); + } + + // re-home location, now that we know the rest of the partvals + Table table = jobInfo.getTable(); + + List partitionCols = new ArrayList(); + for(FieldSchema schema : table.getPartitionKeys()) { + partitionCols.add(schema.getName()); + } + + location = driver.getOutputLocation(jobContext, + table.getSd().getLocation() , partitionCols, + partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); + } + //Initialize the storage driver driver.setSchema(jobContext, jobInfo.getOutputSchema()); - driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues()); - driver.setOutputPath(jobContext, jobInfo.getLocation()); + driver.setPartitionValues(jobContext, partitionValues); + driver.setOutputPath(jobContext, location); + +// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues); driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties()); return driver; } catch(Exception e) { + if (e instanceof HCatException){ + throw (HCatException)e; + }else{ throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e); + } } } + /** + * Gets the output storage driver instance, with allowing specification + * of partvals from which it picks the dynamic partvals + * @param jobContext the job context + * @param jobInfo the output job info + * @return the output driver instance + * @throws IOException + */ + + protected static HCatOutputStorageDriver getOutputDriverInstance( + JobContext context, OutputJobInfo jobInfo, + Map fullPartSpec) throws IOException { + List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){ + return getOutputDriverInstance(context,jobInfo,(List)null); + }else{ + List dynKeyVals = new ArrayList(); + for (String dynamicPartKey : dynamicPartKeys){ + dynKeyVals.add(fullPartSpec.get(dynamicPartKey)); + } + return getOutputDriverInstance(context,jobInfo,dynKeyVals); + } + } + + protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema, Map partMap) throws HCatException, IOException { List posOfPartCols = new ArrayList(); + List posOfDynPartCols = new ArrayList(); // If partition columns occur in data, we want to remove them. // So, find out positions of partition columns in schema provided by user. // We also need to update the output Schema with these deletions. - + // Note that, output storage drivers never sees partition columns in data // or schema. @@ -140,8 +221,26 @@ schemaWithoutParts.remove(schema.get(partKey)); } } + + // Also, if dynamic partitioning is being used, we want to + // set appropriate list of columns for the columns to be dynamically specified. + // These would be partition keys too, so would also need to be removed from + // output schema and partcols + + if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){ + for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){ + Integer idx; + if((idx = schema.getPosition(partKey)) != null){ + posOfPartCols.add(idx); + posOfDynPartCols.add(idx); + schemaWithoutParts.remove(schema.get(partKey)); + } + } + } + HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts); jobInfo.setPosOfPartCols(posOfPartCols); + jobInfo.setPosOfDynPartCols(posOfDynPartCols); jobInfo.setOutputSchema(schemaWithoutParts); } } Index: src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (working copy) @@ -56,7 +56,12 @@ * data contains partition columns.*/ private List posOfPartCols; + private List posOfDynPartCols; + private int maxDynamicPartitions; + + private boolean harRequested; + /** * @return the posOfPartCols */ @@ -65,6 +70,13 @@ } /** + * @return the posOfDynPartCols + */ + protected List getPosOfDynPartCols() { + return posOfDynPartCols; + } + + /** * @param posOfPartCols the posOfPartCols to set */ protected void setPosOfPartCols(List posOfPartCols) { @@ -78,6 +90,14 @@ this.posOfPartCols = posOfPartCols; } + /** + * @param posOfDynPartCols the posOfDynPartCols to set + */ + protected void setPosOfDynPartCols(List posOfDynPartCols) { + // Important - no sorting here! We retain order, it's used to match with values at runtime + this.posOfDynPartCols = posOfDynPartCols; + } + public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema, StorerInfo storerInfo, String location, Table table) { super(); @@ -139,4 +159,36 @@ return table; } + /** + * Set maximum number of allowable dynamic partitions + * @param maxDynamicPartitions + */ + public void setMaximumDynamicPartitions(int maxDynamicPartitions){ + this.maxDynamicPartitions = maxDynamicPartitions; + } + + /** + * Returns maximum number of allowable dynamic partitions + * @return maximum number of allowable dynamic partitions + */ + public int getMaxDynamicPartitions() { + return this.maxDynamicPartitions; + } + + /** + * Sets whether or not hadoop archiving has been requested for this job + * @param harRequested + */ + public void setHarRequested(boolean harRequested){ + this.harRequested = harRequested; + } + + /** + * Returns whether or not hadoop archiving has been requested for this job + * @return whether or not hadoop archiving has been requested for this job + */ + public boolean getHarRequested() { + return this.harRequested; + } + } Index: src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; @@ -47,8 +48,8 @@ private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class); - public HCatEximOutputCommitter(OutputCommitter baseCommitter) { - super(baseCommitter); + public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) { + super(context,baseCommitter); } @Override Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (working copy) @@ -21,11 +21,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +57,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -66,9 +70,15 @@ * and should be given as null. The value is the HCatRecord to write.*/ public class HCatOutputFormat extends HCatBaseOutputFormat { +// static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class); + /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; - private static Map> tokenMap = new HashMap>(); + + /** */ + protected static final String DYNTEMP_DIR_NAME = "_DYN"; + + private static Map> tokenMap = new HashMap>(); private static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ @@ -76,6 +86,9 @@ return !name.startsWith("_") && !name.startsWith("."); } }; + + private static int maxDynamicPartitions; + private static boolean harRequested; /** * Set the info about the output to write for the Job. This queries the metadata server @@ -90,19 +103,60 @@ try { - Configuration conf = job.getConfiguration(); + Configuration conf = job.getConfiguration(); client = createHiveClient(outputInfo.getServerUri(), conf); Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName()); - if( outputInfo.getPartitionValues() == null ) { + if (table.getPartitionKeysSize() == 0 ){ + if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){ + // attempt made to save partition values in non-partitioned table - throw error. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Partition values specified for non-partitioned table"); + } + // non-partitioned table outputInfo.setPartitionValues(new HashMap()); + } else { - //Convert user specified map to have lower case key names + // partitioned table, we expect partition values + // convert user specified map to have lower case key names Map valueMap = new HashMap(); - for(Map.Entry entry : outputInfo.getPartitionValues().entrySet()) { - valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); + if (outputInfo.getPartitionValues() != null){ + for(Map.Entry entry : outputInfo.getPartitionValues().entrySet()) { + valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); + } } + if ( + (outputInfo.getPartitionValues() == null) + || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize()) + ){ + // dynamic partition usecase - partition values were null, or not all were specified + // need to figure out which keys are not specified. + List dynamicPartitioningKeys = new ArrayList(); + boolean firstItem = true; + for (FieldSchema fs : table.getPartitionKeys()){ + if (!valueMap.containsKey(fs.getName().toLowerCase())){ + dynamicPartitioningKeys.add(fs.getName().toLowerCase()); + } + } + + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){ + // If this isn't equal, then bogus key values have been inserted, error out. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); + } + + outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); + String dynHash; + if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){ + dynHash = String.valueOf(Math.random()); +// LOG.info("New dynHash : ["+dynHash+"]"); +// }else{ +// LOG.info("Old dynHash : ["+dynHash+"]"); + } + conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); + + } + outputInfo.setPartitionValues(valueMap); } @@ -125,11 +179,13 @@ String tblLocation = tblSD.getLocation(); String location = driver.getOutputLocation(job, tblLocation, partitionCols, - outputInfo.getPartitionValues()); + outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); //Serialize the output info into the configuration OutputJobInfo jobInfo = new OutputJobInfo(outputInfo, tableSchema, tableSchema, storerInfo, location, table); + jobInfo.setHarRequested(harRequested); + jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); Path tblPath = new Path(tblLocation); @@ -176,6 +232,7 @@ // TableInfo, we can have as many tokens as there are stores and the TokenSelector // will correctly pick the right tokens which the committer will use and // cancel. + String tokenSignature = getTokenSignature(outputInfo); if(tokenMap.get(tokenSignature) == null) { // get delegation tokens from hcat server and store them into the "job" @@ -183,19 +240,32 @@ // hcat // when the JobTracker in Hadoop MapReduce starts supporting renewal of // arbitrary tokens, the renewer should be the principal of the JobTracker - String tokenStrForm = client.getDelegationToken(ugi.getUserName()); - Token t = new Token(); - t.decodeFromUrlString(tokenStrForm); - t.setService(new Text(tokenSignature)); - tokenMap.put(tokenSignature, t); + tokenMap.put(tokenSignature, HCatUtil.extractThriftToken( + client.getDelegationToken(ugi.getUserName()), + tokenSignature)); } + + String jcTokenSignature = "jc."+tokenSignature; + if(tokenMap.get(jcTokenSignature) == null) { + tokenMap.put(jcTokenSignature, + HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName())); + } + job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature), tokenMap.get(tokenSignature)); // this will be used by the outputcommitter to pass on to the metastore client // which in turn will pass on to the TokenSelector so that it can select // the right token. + job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature), + tokenMap.get(jcTokenSignature)); + job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); - } + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature); + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, tokenMap.get(jcTokenSignature).encodeToUrlString()); + +// LOG.info("Set hive dt["+tokenSignature+"]"); +// LOG.info("Set jt dt["+jcTokenSignature+"]"); + } } } catch(Exception e) { if( e instanceof HCatException ) { @@ -207,10 +277,10 @@ if( client != null ) { client.close(); } +// HCatUtil.logAllTokens(LOG,job); } } - // a signature string to associate with a HCatTableInfo - essentially // a concatenation of dbname, tablename and partition keyvalues. private static String getTokenSignature(HCatTableInfo outputInfo) { @@ -232,11 +302,10 @@ return result.toString(); } - - /** * Handles duplicate publish of partition. Fails if partition already exists. * For non partitioned tables, fails if files are present in table directory. + * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time * @param job the job * @param outputInfo the output info * @param client the metastore client @@ -247,18 +316,33 @@ */ private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo, HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException { - List partitionValues = HCatOutputCommitter.getPartitionValueList( - table, outputInfo.getPartitionValues()); + /* + * For fully specified ptn, follow strict checks for existence of partitions in metadata + * For unpartitioned tables, follow filechecks + * For partially specified tables: + * This would then need filechecks at the start of a ptn write, + * Doing metadata checks can get potentially very expensive (fat conf) if + * there are a large number of partitions that match the partial specifications + */ + if( table.getPartitionKeys().size() > 0 ) { - //For partitioned table, fail if partition is already present - List currentParts = client.listPartitionNames(outputInfo.getDatabaseName(), - outputInfo.getTableName(), partitionValues, (short) 1); + if (!outputInfo.isDynamicPartitioningUsed()){ + List partitionValues = HCatOutputCommitter.getPartitionValueList( + table, outputInfo.getPartitionValues()); + // fully-specified partition + List currentParts = client.listPartitionNames(outputInfo.getDatabaseName(), + outputInfo.getTableName(), partitionValues, (short) 1); - if( currentParts.size() > 0 ) { - throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); + if( currentParts.size() > 0 ) { + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); + } } } else { + List partitionValues = HCatOutputCommitter.getPartitionValueList( + table, outputInfo.getPartitionValues()); + // non-partitioned table + Path tablePath = new Path(table.getSd().getLocation()); FileSystem fs = tablePath.getFileSystem(job.getConfiguration()); @@ -299,24 +383,12 @@ getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { - // First create the RW. HCatRecordWriter rw = new HCatRecordWriter(context); - - // Now set permissions and group on freshly created files. - OutputJobInfo info = getJobInfo(context); - Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation()); - Path tblPath = new Path(info.getTable().getSd().getLocation()); - FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); - FileStatus tblPathStat = fs.getFileStatus(tblPath); - fs.setPermission(workFile, tblPathStat.getPermission()); - try{ - fs.setOwner(workFile, null, tblPathStat.getGroup()); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in HCat. - } + rw.prepareForStorageDriverOutput(context); return rw; } + /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. @@ -329,10 +401,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException { OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); - return new HCatOutputCommitter(outputFormat.getOutputCommitter(context)); + return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context)); } static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { + HiveConf hiveConf = getHiveConf(url, conf); +// HCatUtil.logHiveConf(LOG, hiveConf); + return new HiveMetaStoreClient(hiveConf); + } + + + private static HiveConf getHiveConf(String url, Configuration conf) throws IOException { HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); if( url != null ) { @@ -372,9 +451,48 @@ } } + + // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ + maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + }else{ + maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions + } + harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); + return hiveConf; + } - return new HiveMetaStoreClient(hiveConf); + /** + * Any initialization of file paths, set permissions and group on freshly created files + * This is called at RecordWriter instantiation time which can be at write-time for + * a dynamic partitioning usecase + * @param context + * @throws IOException + */ + public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException { + OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context); +// Path workFile = osd.getWorkFilePath(context,info.getLocation()); + Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir")); + Path tblPath = new Path(info.getTable().getSd().getLocation()); + FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); + FileStatus tblPathStat = fs.getFileStatus(tblPath); + +// LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+ +// workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+ +// context.getConfiguration().get("mapred.output.dir")+"]"); +// +// FileStatus wFileStatus = fs.getFileStatus(workFile); +// LOG.info("Table : "+tblPathStat.getPath()); +// LOG.info("Working File : "+wFileStatus.getPath()); + + fs.setPermission(workFile, tblPathStat.getPermission()); + try{ + fs.setOwner(workFile, null, tblPathStat.getGroup()); + } catch(AccessControlException ace){ + // log the messages before ignoring. Currently, logging is not built in HCat. + } } + } Index: src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (working copy) @@ -18,60 +18,174 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; public class HCatRecordWriter extends RecordWriter, HCatRecord> { private final HCatOutputStorageDriver storageDriver; - /** - * @return the storageDriver - */ - public HCatOutputStorageDriver getStorageDriver() { - return storageDriver; - } + private boolean dynamicPartitioningUsed = false; + +// static final private Log LOG = LogFactory.getLog(HCatRecordWriter.class); + private final RecordWriter, ? super Writable> baseWriter; + private final Map, ? super Writable>> baseDynamicWriters; + private final Map baseDynamicStorageDrivers; + private final List partColsToDel; + private final List dynamicPartCols; + private int maxDynamicPartitions; + private OutputJobInfo jobInfo; + private TaskAttemptContext context; + public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + jobInfo = HCatOutputFormat.getJobInfo(context); + this.context = context; // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); + dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + dynamicPartCols = jobInfo.getPosOfDynPartCols(); + maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); - if(partColsToDel == null){ + if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){ throw new HCatException("It seems that setSchema() is not called on " + "HCatOutputFormat. Please make sure that method is called."); } + - this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo); - this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context); + if (!dynamicPartitioningUsed){ + this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo); + this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context); + this.baseDynamicStorageDrivers = null; + this.baseDynamicWriters = null; + }else{ + this.baseDynamicStorageDrivers = new HashMap(); + this.baseDynamicWriters = new HashMap, ? super Writable>>(); + this.storageDriver = null; + this.baseWriter = null; + } + } + /** + * @return the storageDriver + */ + public HCatOutputStorageDriver getStorageDriver() { + return storageDriver; + } + @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + if (dynamicPartitioningUsed){ + for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()){ + bwriter.close(context); + } + for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){ + OutputCommitter baseOutputCommitter = osd.getOutputFormat().getOutputCommitter(context); + if (baseOutputCommitter.needsTaskCommit(context)){ + baseOutputCommitter.commitTask(context); + } + } + } else { baseWriter.close(context); + } } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { + RecordWriter, ? super Writable> localWriter; + HCatOutputStorageDriver localDriver; + +// HCatUtil.logList(LOG, "HCatRecord to write", value.getAll()); + if (dynamicPartitioningUsed){ + // calculate which writer to use from the remaining values - this needs to be done before we delete cols + + List dynamicPartValues = new ArrayList(); + for (Integer colToAppend : dynamicPartCols){ + dynamicPartValues.add(value.get(colToAppend).toString()); + } + + int dynHashCode = dynamicPartValues.hashCode(); + if (!baseDynamicWriters.containsKey(dynHashCode)){ +// LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size() +// +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString()); + if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){ + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } +// HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues); +// HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols); + + HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues); + RecordWriter, ? super Writable> baseRecordWriter + = localOsd.getOutputFormat().getRecordWriter(context); + localOsd.setupOutputCommitterJob(context); + OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context); + baseOutputCommitter.setupTask(context); + prepareForStorageDriverOutput(localOsd,context); + baseDynamicWriters.put(dynHashCode, baseRecordWriter); + baseDynamicStorageDrivers.put(dynHashCode,localOsd); + } + + localWriter = baseDynamicWriters.get(dynHashCode); + localDriver = baseDynamicStorageDrivers.get(dynHashCode); + }else{ + localWriter = baseWriter; + localDriver = storageDriver; + } + for(Integer colToDel : partColsToDel){ value.remove(colToDel); } - //The key given by user is ignored - WritableComparable generatedKey = storageDriver.generateKey(value); - Writable convertedValue = storageDriver.convertValue(value); - baseWriter.write(generatedKey, convertedValue); + + //The key given by user is ignored + WritableComparable generatedKey = localDriver.generateKey(value); + Writable convertedValue = localDriver.convertValue(value); + localWriter.write(generatedKey, convertedValue); } + + protected HCatOutputStorageDriver createDynamicStorageDriver(List dynamicPartVals) throws IOException { + HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals); + return localOsd; + } + + public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException { + // Set permissions and group on freshly created files. + if (!dynamicPartitioningUsed){ + HCatOutputStorageDriver localOsd = this.getStorageDriver(); + prepareForStorageDriverOutput(localOsd,context); + } + } + + private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd, + TaskAttemptContext context) throws IOException { + HCatOutputFormat.prepareOutputLocation(localOsd,context); + } } Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (working copy) @@ -34,7 +34,7 @@ /** The underlying output committer */ protected final OutputCommitter baseCommitter; - public HCatBaseOutputCommitter(OutputCommitter baseCommitter) { + public HCatBaseOutputCommitter(JobContext context, OutputCommitter baseCommitter) { this.baseCommitter = baseCommitter; } Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (working copy) @@ -21,26 +21,35 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -49,49 +58,105 @@ import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; import org.apache.thrift.TException; public class HCatOutputCommitter extends OutputCommitter { +// static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class); + /** The underlying output committer */ private final OutputCommitter baseCommitter; - public HCatOutputCommitter(OutputCommitter baseCommitter) { + private final boolean dynamicPartitioningUsed; + private boolean partitionsDiscovered; + + private Map> partitionsDiscoveredByPath; + private Map storageDriversDiscoveredByPath; + + HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor(); + + private String ptnRootLocation = null; + + public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException { + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + if (!dynamicPartitioningUsed){ this.baseCommitter = baseCommitter; + this.partitionsDiscovered = true; + }else{ + this.baseCommitter = null; + this.partitionsDiscovered = false; + } } @Override public void abortTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.abortTask(context); + } } @Override public void commitTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.commitTask(context); + }else{ + // called explicitly through HCatRecordWriter.close() if dynamic + } } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ return baseCommitter.needsTaskCommit(context); + }else{ + // called explicitly through HCatRecordWriter.close() if dynamic - return false by default + return false; + } } @Override public void setupJob(JobContext context) throws IOException { - if( baseCommitter != null ) { - baseCommitter.setupJob(context); - } + if( baseCommitter != null ) { + baseCommitter.setupJob(context); + } + // in dynamic usecase, called through HCatRecordWriter } @Override public void setupTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.setupTask(context); - } + }else{ + // called explicitly through HCatRecordWriter.write() if dynamic + } + } @Override public void abortJob(JobContext jobContext, State state) throws IOException { + + if (dynamicPartitioningUsed){ + discoverPartitions(jobContext); + } + if(baseCommitter != null) { baseCommitter.abortJob(jobContext, state); + }else{ + if (dynamicPartitioningUsed){ + for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + try { + baseOsd.abortOutputCommitterJob( + new TaskAttemptContext( + jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) + ),state); + } catch (Exception e) { + throw new IOException(e); + } + } + } } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); try { @@ -106,6 +171,13 @@ (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } + + String jcTokenStrForm = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + String jcTokenSignature = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + if(jcTokenStrForm != null && jcTokenSignature != null) { + HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + } + } catch(Exception e) { if( e instanceof HCatException ) { throw (HCatException) e; @@ -114,8 +186,16 @@ } } - Path src = new Path(jobInfo.getLocation()); + Path src; + if (dynamicPartitioningUsed){ + src = new Path(getPartitionRootLocation( + jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize() + )); + }else{ + src = new Path(jobInfo.getLocation()); + } FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); +// LOG.warn("abortJob about to delete ["+src.toString() +"]"); fs.delete(src, true); } @@ -130,6 +210,10 @@ @Override public void commitJob(JobContext jobContext) throws IOException { + if (dynamicPartitioningUsed){ + discoverPartitions(jobContext); + } + if(baseCommitter != null) { baseCommitter.commitJob(jobContext); } @@ -153,12 +237,15 @@ @Override public void cleanupJob(JobContext context) throws IOException { + if (dynamicPartitioningUsed){ + discoverPartitions(context); + } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); Configuration conf = context.getConfiguration(); Table table = jobInfo.getTable(); - StorageDescriptor tblSD = table.getSd(); - Path tblPath = new Path(tblSD.getLocation()); + Path tblPath = new Path(table.getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); if( table.getPartitionKeys().size() == 0 ) { @@ -166,75 +253,116 @@ if( baseCommitter != null ) { baseCommitter.cleanupJob(context); + }else{ + if (dynamicPartitioningUsed){ + for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + try { + baseOsd.cleanupOutputCommitterJob( + new TaskAttemptContext( + context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) + )); + } catch (Exception e) { + throw new IOException(e); + } + } + } } - + //Move data from temp directory the actual table directory //No metastore operation required. Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath); + moveTaskOutputs(fs, src, src, tblPath,false); fs.delete(src, true); return; } HiveMetaStoreClient client = null; List values = null; - boolean partitionAdded = false; HCatTableInfo tableInfo = jobInfo.getTableInfo(); + List partitionsAdded = new ArrayList(); + try { client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf); StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); - Partition partition = new Partition(); - partition.setDbName(tableInfo.getDatabaseName()); - partition.setTableName(tableInfo.getTableName()); - partition.setSd(new StorageDescriptor(tblSD)); - partition.getSd().setLocation(jobInfo.getLocation()); - updateTableSchema(client, table, jobInfo.getOutputSchema()); + + FileStatus tblStat = fs.getFileStatus(tblPath); + String grpName = tblStat.getGroup(); + FsPermission perms = tblStat.getPermission(); - List fields = new ArrayList(); - for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) { - fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema)); + List partitionsToAdd = new ArrayList(); + if (!dynamicPartitioningUsed){ + partitionsToAdd.add( + constructPartition( + context, + tblPath.toString(), tableInfo.getPartitionValues() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + }else{ + for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ + partitionsToAdd.add( + constructPartition( + context, + getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + } } - partition.getSd().setCols(fields); + //Publish the new partition(s) + if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + + Path src = new Path(ptnRootLocation); - Map partKVs = tableInfo.getPartitionValues(); - //Get partition value list - partition.setValues(getPartitionValueList(table,partKVs)); + // check here for each dir we're copying out, to see if it already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath,true); + + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + + +// for (Partition partition : partitionsToAdd){ +// partitionsAdded.add(client.add_partition(partition)); +// // currently following add_partition instead of add_partitions because latter isn't +// // all-or-nothing and we want to be able to roll back partitions we added if need be. +// } - Map params = new HashMap(); - params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass()); - params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass()); + try { + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + } catch (Exception e){ + // There was an error adding partitions : rollback fs copy and rethrow + for (Partition p : partitionsToAdd){ + Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); + if (fs.exists(ptnPath)){ + fs.delete(ptnPath,true); + } + } + throw e; + } - //Copy table level hcat.* keys to the partition - for(Map.Entry entry : storer.getProperties().entrySet()) { - params.put(entry.getKey().toString(), entry.getValue().toString()); - } + }else{ + // no harProcessor, regular operation - partition.setParameters(params); + // No duplicate partition publish case to worry about because we'll + // get a AlreadyExistsException here if so, and appropriately rollback + + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; - // Sets permissions and group name on partition dirs. - FileStatus tblStat = fs.getFileStatus(tblPath); - String grpName = tblStat.getGroup(); - FsPermission perms = tblStat.getPermission(); - Path partPath = tblPath; - for(FieldSchema partKey : table.getPartitionKeys()){ - partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); - fs.setPermission(partPath, perms); - try{ - fs.setOwner(partPath, null, grpName); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in Hcatalog. + if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); } + } - - //Publish the new partition - client.add_partition(partition); - partitionAdded = true; //publish to metastore done - + if( baseCommitter != null ) { baseCommitter.cleanupJob(context); } @@ -247,13 +375,24 @@ (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } + + String jcTokenStrForm = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + String jcTokenSignature = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + if(jcTokenStrForm != null && jcTokenSignature != null) { + HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + } + } catch (Exception e) { - if( partitionAdded ) { + if( partitionsAdded.size() > 0 ) { try { //baseCommitter.cleanupJob failed, try to clean up the metastore + for (Partition p : partitionsAdded){ client.dropPartition(tableInfo.getDatabaseName(), - tableInfo.getTableName(), values); + tableInfo.getTableName(), p.getValues()); + } } catch(Exception te) { //Keep cause as the original exception throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); @@ -272,6 +411,114 @@ } } + private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) { + if (ptnRootLocation == null){ + // we only need to calculate it once, it'll be the same for other partitions in this job. + Path ptnRoot = new Path(ptnLocn); + for (int i = 0; i < numPtnKeys; i++){ +// LOG.info("Getting parent of "+ptnRoot.getName()); + ptnRoot = ptnRoot.getParent(); + } + ptnRootLocation = ptnRoot.toString(); + } +// LOG.info("Returning final parent : "+ptnRootLocation); + return ptnRootLocation; + } + + /** + * Generate partition metadata object to be used to add to metadata. + * @param partLocnRoot The table-equivalent location root of the partition + * (temporary dir if dynamic partition, table dir if static) + * @param partKVs The keyvalue pairs that form the partition + * @param outputSchema The output schema for the partition + * @param params The parameters to store inside the partition + * @param table The Table metadata object under which this Partition will reside + * @param fs FileSystem object to operate on the underlying filesystem + * @param grpName Group name that owns the table dir + * @param perms FsPermission that's the default permission of the table dir. + * @return Constructed Partition metadata object + * @throws IOException + */ + + private Partition constructPartition( + JobContext context, + String partLocnRoot, Map partKVs, + HCatSchema outputSchema, Map params, + Table table, FileSystem fs, + String grpName, FsPermission perms) throws IOException { + + StorageDescriptor tblSD = table.getSd(); + + Partition partition = new Partition(); + partition.setDbName(table.getDbName()); + partition.setTableName(table.getTableName()); + partition.setSd(new StorageDescriptor(tblSD)); + + List fields = new ArrayList(); + for(HCatFieldSchema fieldSchema : outputSchema.getFields()) { + fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema)); + } + + partition.getSd().setCols(fields); + + partition.setValues(getPartitionValueList(table,partKVs)); + + partition.setParameters(params); + + // Sets permissions and group name on partition dirs. + + Path partPath = new Path(partLocnRoot); + for(FieldSchema partKey : table.getPartitionKeys()){ + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); +// LOG.info("Setting perms for "+partPath.toString()); + fs.setPermission(partPath, perms); + try{ + fs.setOwner(partPath, null, grpName); + } catch(AccessControlException ace){ + // log the messages before ignoring. Currently, logging is not built in Hcatalog. +// LOG.warn(ace); + } + } + if (dynamicPartitioningUsed){ + String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs); + if (harProcessor.isEnabled()){ + harProcessor.exec(context, partition, partPath); + partition.getSd().setLocation( + harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination))); + }else{ + partition.getSd().setLocation(dynamicPartitionDestination); + } + }else{ + partition.getSd().setLocation(partPath.toString()); + } + + return partition; + } + + + + private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { + // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> + // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA + Path partPath = new Path(table.getSd().getLocation()); + for(FieldSchema partKey : table.getPartitionKeys()){ + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); + } + return partPath.toString(); + } + + private Map getStorerParameterMap(StorerInfo storer) { + Map params = new HashMap(); + params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass()); + params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass()); + + //Copy table level hcat.* keys to the partition + for(Map.Entry entry : storer.getProperties().entrySet()) { + params.put(entry.getKey().toString(), entry.getValue().toString()); + } + return params; + } + private Path constructPartialPartPath(Path partialPath, String partKey, Map partKVs){ StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey)); @@ -344,31 +591,42 @@ * @param file the file to move * @param src the source directory * @param dest the target directory + * @param dryRun - a flag that simply tests if this move would succeed or not based + * on whether other files exist where we're trying to copy * @throws IOException */ private void moveTaskOutputs(FileSystem fs, Path file, Path src, - Path dest) throws IOException { + Path dest, boolean dryRun) throws IOException { if (fs.isFile(file)) { Path finalOutputPath = getFinalPath(file, src, dest); - if (!fs.rename(file, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); + if (dryRun){ +// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem"); + if (fs.exists(finalOutputPath)){ + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible."); } + }else{ +// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]"); if (!fs.rename(file, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + if (!fs.delete(finalOutputPath, true)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); + } + if (!fs.rename(file, finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + } } } } else if(fs.getFileStatus(file).isDir()) { FileStatus[] paths = fs.listStatus(file); Path finalOutputPath = getFinalPath(file, src, dest); - fs.mkdirs(finalOutputPath); - + if (!dryRun){ + fs.mkdirs(finalOutputPath); + } if (paths != null) { for (FileStatus path : paths) { - moveTaskOutputs(fs, path.getPath(), src, dest); + moveTaskOutputs(fs, path.getPath(), src, dest,dryRun); } } } @@ -398,4 +656,72 @@ } } + /** + * Run to discover dynamic partitions available + */ + private void discoverPartitions(JobContext context) throws IOException { + if (!partitionsDiscovered){ + // LOG.info("discover ptns called"); + + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + + harProcessor.setEnabled(jobInfo.getHarRequested()); + + List dynamicPartCols = jobInfo.getPosOfDynPartCols(); + int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); + + Path loadPath = new Path(jobInfo.getLocation()); + FileSystem fs = loadPath.getFileSystem(context.getConfiguration()); + + // construct a path pattern (e.g., /*/*) to find all dynamically generated paths + + String dynPathSpec = loadPath.toUri().getPath(); + dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*"); + // TODO : replace this with a param pull from HiveConf + + // LOG.info("Searching for "+dynPathSpec); + Path pathPattern = new Path(loadPath, dynPathSpec); + FileStatus[] status = fs.globStatus(pathPattern); + + partitionsDiscoveredByPath = new LinkedHashMap>(); + storageDriversDiscoveredByPath = new LinkedHashMap(); + + + if (status.length == 0) { + // LOG.warn("No partition found genereated by dynamic partitioning in [" + // +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize() + // +"], dynSpec["+dynPathSpec+"]"); + }else{ + if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){ + this.partitionsDiscovered = true; + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } + + for (FileStatus st : status){ + LinkedHashMap fullPartSpec = new LinkedHashMap(); + Warehouse.makeSpecFromName(fullPartSpec, st.getPath()); + partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec); + storageDriversDiscoveredByPath.put(st.getPath().toString(), + HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec)); + } + } + + // for (Entry> spec : partitionsDiscoveredByPath.entrySet()){ + // LOG.info("Partition "+ spec.getKey()); + // for (Entry e : spec.getValue().entrySet()){ + // LOG.info(e.getKey() + "=>" +e.getValue()); + // } + // } + + this.partitionsDiscovered = true; + } + } + + } Index: src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (revision 1149353) +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (working copy) @@ -91,7 +91,7 @@ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); - return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context)); + return new HCatEximOutputCommitter(context,outputFormat.getOutputCommitter(context)); } public static void setOutput(Job job, String dbname, String tablename, String location, Index: src/java/org/apache/hcatalog/common/ErrorType.java =================================================================== --- src/java/org/apache/hcatalog/common/ErrorType.java (revision 1149353) +++ src/java/org/apache/hcatalog/common/ErrorType.java (working copy) @@ -41,8 +41,8 @@ ERROR_INVALID_PARTITION_VALUES (2010, "Invalid partition values specified"), ERROR_MISSING_PARTITION_KEY (2011, "Partition key value not provided for publish"), ERROR_MOVE_FAILED (2012, "Moving of data failed during commit"), + ERROR_TOO_MANY_DYNAMIC_PTNS (2013, "Attempt to create too many dynamic partitions"), - /* Authorization Errors 3000 - 3999 */ ERROR_ACCESS_CONTROL (3000, "Permission denied"), Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1149353) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -64,13 +64,18 @@ public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info"; public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf"; public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; - + public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig"; + public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform"; + public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix"; + + public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid"; + public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1149353) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -28,21 +28,42 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; public class HCatUtil { +// static final private Log LOG = LogFactory.getLog(HCatUtil.class); + public static boolean checkJobContextIfRunningFromBackend(JobContext j){ if (j.getConfiguration().get("mapred.task.id", "").equals("")){ return false; @@ -256,4 +277,102 @@ return true; } + public static Token getJobTrackerDelegationToken(Configuration conf, String userName) throws Exception { +// LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")"); + JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class)); + Token t = jcl.getDelegationToken(new Text(userName)); +// LOG.info("got "+t); + return t; + +// return null; + } + + public static void cancelJobTrackerDelegationToken(String tokenStrForm, String tokenSignature) throws Exception { +// LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")"); + JobClient jcl = new JobClient(new JobConf(new Configuration(), HCatOutputFormat.class)); + Token t = extractJobTrackerToken(tokenStrForm,tokenSignature); +// LOG.info("canceling "+t); + try { + jcl.cancelDelegationToken(t); + }catch(Exception e){ +// HCatUtil.logToken(LOG, "jcl token to cancel", t); + // ignore if token has already been invalidated. + } + } + + + public static Token + extractThriftToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException { +// LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")"); + Token t = new Token(); + t.decodeFromUrlString(tokenStrForm); + t.setService(new Text(tokenSignature)); +// LOG.info("returning "+t); + return t; + } + + public static Token + extractJobTrackerToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException { +// LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")"); + Token t = + new Token(); + t.decodeFromUrlString(tokenStrForm); + t.setService(new Text(tokenSignature)); +// LOG.info("returning "+t); + return t; + } + + /** + * Logging stack trace + * @param logger + */ + public static void logStackTrace(Log logger) { + StackTraceElement[] stackTrace = new Exception().getStackTrace(); + for (int i = 1 ; i < stackTrace.length ; i++){ + logger.info("\t"+stackTrace[i].toString()); + } + } + + /** + * debug log the hive conf + * @param logger + * @param hc + */ + public static void logHiveConf(Log logger, HiveConf hc){ + logEntrySet(logger,"logging hiveconf:",hc.getAllProperties().entrySet()); + } + + + public static void logList(Log logger, String itemName, List list){ + logger.info(itemName+":"); + for (Object item : list){ + logger.info("\t["+item+"]"); + } + } + + public static void logMap(Log logger, String itemName, Map map){ + logEntrySet(logger,itemName,map.entrySet()); + } + + public static void logEntrySet(Log logger, String itemName, Set entrySet) { + logger.info(itemName+":"); + for (Entry e : entrySet){ + logger.info("\t["+e.getKey()+"]=>["+e.getValue()+"]"); + } + } + + public static void logAllTokens(Log logger, JobContext context) throws IOException { + for (Tokent : context.getCredentials().getAllTokens()){ + logToken(logger,"token",t); + } + } + + public static void logToken(Log logger, String itemName, Token t) throws IOException { + logger.info(itemName+":"); + logger.info("\tencodeToUrlString : "+t.encodeToUrlString()); + logger.info("\ttoString : "+t.toString()); + logger.info("\tkind : "+t.getKind()); + logger.info("\tservice : "+t.getService()); + } + } Index: src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java =================================================================== --- src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (revision 0) +++ src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (revision 0) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.har; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.tools.HadoopArchives; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; + +public class HarOutputCommitterPostProcessor { + +// static final private Log LOG = LogFactory.getLog(HarOutputCommitterPostProcessor.class); + + boolean isEnabled = false; + + public boolean isEnabled() { + return isEnabled; + } + + public void setEnabled(boolean enabled) { + this.isEnabled = enabled; + } + + + public void exec(JobContext context, Partition partition, Path partPath) throws IOException { +// LOG.info("Archiving partition ["+partPath.toString()+"]"); + makeHar(context,partPath.toUri().toString(),harFile(partPath)); + partition.getParameters().put(Constants.IS_ARCHIVED, "true"); + } + + public String harFile(Path ptnPath) throws IOException{ + String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har"; +// LOG.info("har file : " + harFile); + return harFile; + } + + public String getParentFSPath(Path ptnPath) throws IOException { + return ptnPath.toUri().getPath().replaceFirst("/+$", ""); + } + + public String getProcessedLocation(Path ptnPath) throws IOException { + String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR; +// LOG.info("har location : " + harLocn); + return harLocn; + } + + + /** + * Creates a har file from the contents of a given directory, using that as root. + * @param dir Directory to archive + * @param harName The HAR file to create + */ + public static void makeHar(JobContext context, String dir, String harFile) throws IOException{ +// Configuration conf = context.getConfiguration(); +// Credentials creds = context.getCredentials(); + +// HCatUtil.logAllTokens(LOG,context); + + int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR); + Path archivePath = new Path(harFile.substring(0,lastSep)); + final String[] args = { + "-archiveName", + harFile.substring(lastSep+1, harFile.length()), + "-p", + dir, + "*", + archivePath.toString() + }; +// for (String arg : args){ +// LOG.info("Args to har : "+ arg); +// } + try { + Configuration newConf = new Configuration(); + FileSystem fs = archivePath.getFileSystem(newConf); + + newConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); +// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]"); + +// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){ +// LOG.info("src : "+ds.getPath().toUri().toString()); +// } + + final HadoopArchives har = new HadoopArchives(newConf); + int rc = ToolRunner.run(har, args); + if (rc!= 0){ + throw new Exception("Har returned error code "+rc); + } + +// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){ +// LOG.info("dest : "+hs.getPath().toUri().toString()); +// } +// doHarCheck(fs,harFile); +// LOG.info("Nuking " + dir); + fs.delete(new Path(dir), true); + } catch (Exception e){ + throw new HCatException("Error creating Har ["+harFile+"] from ["+dir+"]", e); + } + } + +} Index: build.xml =================================================================== --- build.xml (revision 1149353) +++ build.xml (working copy) @@ -107,6 +107,7 @@ +