Index: src/java/org/apache/hcatalog/pig/HCatLoader.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatLoader.java (revision 1312006) +++ src/java/org/apache/hcatalog/pig/HCatLoader.java (working copy) @@ -18,7 +18,11 @@ package org.apache.hcatalog.pig; import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.fs.Path; @@ -26,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.Credentials; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.Pair; @@ -54,10 +59,13 @@ private String hcatServerUri; private String partitionFilterString; private final PigHCatUtil phutil = new PigHCatUtil(); - + // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize final public static String INNER_SIGNATURE = "hcatloader.inner.signature"; final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature"; + // A hash map which stores job credentials. The key is a signature passed by Pig, which is + //unique to the load func and input file name (table, in our case). + private static Map jobCredentials = new HashMap(); @Override public InputFormat getInputFormat() throws IOException { @@ -75,11 +83,16 @@ @Override public void setLocation(String location, Job job) throws IOException { + UDFContext udfContext = UDFContext.getUDFContext(); + Properties udfProps = udfContext.getUDFProperties(this.getClass(), + new String[]{signature}); job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature); Pair dbTablePair = PigHCatUtil.getDBTableNames(location); dbName = dbTablePair.first; tableName = dbTablePair.second; + RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps + .get(PRUNE_PROJECTION_INFO); // get partitionFilterString stored in the UDFContext - it would have // been stored there by an earlier call to setPartitionFilter // call setInput on HCatInputFormat only in the frontend because internally @@ -87,50 +100,71 @@ // the backend // in the hadoop front end mapred.task.id property will not be set in // the Configuration - if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){ - HCatInputFormat.setInput(job, - InputJobInfo.create(dbName, - tableName, - getPartitionFilterString())); - } - // Need to also push projections by calling setOutputSchema on - // HCatInputFormat - we have to get the RequiredFields information - // from the UdfContext, translate it to an Schema and then pass it - // The reason we do this here is because setLocation() is called by - // Pig runtime at InputFormat.getSplits() and - // InputFormat.createRecordReader() time - we are not sure when - // HCatInputFormat needs to know about pruned projections - so doing it - // here will ensure we communicate to HCatInputFormat about pruned - // projections at getSplits() and createRecordReader() time + if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) { + for( Enumeration emr = udfProps.keys();emr.hasMoreElements();) { + PigHCatUtil.getConfigFromUDFProperties(udfProps, + job.getConfiguration(), emr.nextElement().toString()); + } + Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature); + if (crd != null) { + job.getCredentials().addAll(crd); + } - UDFContext udfContext = UDFContext.getUDFContext(); - Properties props = udfContext.getUDFProperties(this.getClass(), - new String[]{signature}); - RequiredFieldList requiredFieldsInfo = - (RequiredFieldList)props.get(PRUNE_PROJECTION_INFO); - if(requiredFieldsInfo != null) { - // convert to hcatschema and pass to HCatInputFormat - try { - outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass()); - HCatInputFormat.setOutputSchema(job, outputSchema); - } catch (Exception e) { - throw new IOException(e); - } - } else{ - // else - this means pig's optimizer never invoked the pushProjection - // method - so we need all fields and hence we should not call the - // setOutputSchema on HCatInputFormat - if (HCatUtil.checkJobContextIfRunningFromBackend(job)){ - try { - HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA); - outputSchema = hcatTableSchema; - HCatInputFormat.setOutputSchema(job, outputSchema); - } catch (Exception e) { - throw new IOException(e); + } else { + Job clone = new Job(job.getConfiguration()); + HCatInputFormat.setInput(job, InputJobInfo.create(dbName, + tableName, getPartitionFilterString())); + + // We will store all the new /changed properties in the job in the + // udf context, so the the HCatInputFormat.setInput method need not + //be called many times. + for (Entry keyValue : job.getConfiguration()) { + String oldValue = clone.getConfiguration().get(keyValue.getKey()); + if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) { + udfProps.put(keyValue.getKey(), keyValue.getValue()); + } + } + udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true); + + //Store credentials in a private hash map and not the udf context to + // make sure they are not public. + jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature,job.getCredentials()); } - } - } + + // Need to also push projections by calling setOutputSchema on + // HCatInputFormat - we have to get the RequiredFields information + // from the UdfContext, translate it to an Schema and then pass it + // The reason we do this here is because setLocation() is called by + // Pig runtime at InputFormat.getSplits() and + // InputFormat.createRecordReader() time - we are not sure when + // HCatInputFormat needs to know about pruned projections - so doing it + // here will ensure we communicate to HCatInputFormat about pruned + // projections at getSplits() and createRecordReader() time + + if(requiredFieldsInfo != null) { + // convert to hcatschema and pass to HCatInputFormat + try { + outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass()); + HCatInputFormat.setOutputSchema(job, outputSchema); + } catch (Exception e) { + throw new IOException(e); + } + } else{ + // else - this means pig's optimizer never invoked the pushProjection + // method - so we need all fields and hence we should not call the + // setOutputSchema on HCatInputFormat + if (HCatUtil.checkJobContextIfRunningFromBackend(job)){ + try { + HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); + outputSchema = hcatTableSchema; + HCatInputFormat.setOutputSchema(job, outputSchema); + } catch (Exception e) { + throw new IOException(e); + } + } + } + } @Override Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1312006) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -19,15 +19,19 @@ package org.apache.hcatalog.pig; import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.Credentials; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; -import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.OutputJobInfo; @@ -49,6 +53,9 @@ // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize final public static String INNER_SIGNATURE = "hcatstorer.inner.signature"; final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature"; + // A hash map which stores job credentials. The key is a signature passed by Pig, which is + //unique to the store func and out file name (table, in our case). + private static Map jobCredentials = new HashMap(); public HCatStorer(String partSpecs, String schema) throws Exception { @@ -70,79 +77,83 @@ @Override public void setStoreLocation(String location, Job job) throws IOException { - job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); - Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); - String[] userStr = location.split("\\."); - OutputJobInfo outputJobInfo; + Configuration config = job.getConfiguration(); + config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); + Properties udfProps = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[] { sign }); + String[] userStr = location.split("\\."); - String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO); - if (outInfoString != null) { - outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString); - } else { - if(userStr.length == 2) { - outputJobInfo = OutputJobInfo.create(userStr[0], - userStr[1], - partitions); - } else if(userStr.length == 1) { - outputJobInfo = OutputJobInfo.create(null, - userStr[0], - partitions); - } else { - throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); - } - } + if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) { + for(Enumeration emr = udfProps.keys();emr.hasMoreElements();){ + PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString()); + } + Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign); + if (crd != null) { + job.getCredentials().addAll(crd); + } + } else { + Job clone = new Job(job.getConfiguration()); + OutputJobInfo outputJobInfo; + if (userStr.length == 2) { + outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], + partitions); + } else if (userStr.length == 1) { + outputJobInfo = OutputJobInfo.create(null, userStr[0], + partitions); + } else { + throw new FrontendException("location " + location + + " is invalid. It must be of the form [db.]table", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + Schema schema = (Schema) ObjectSerializer.deserialize(udfProps + .getProperty(PIG_SCHEMA)); + if (schema != null) { + pigSchema = schema; + } + if (pigSchema == null) { + throw new FrontendException( + "Schema for data cannot be determined.", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + try { + HCatOutputFormat.setOutput(job, outputJobInfo); + } catch (HCatException he) { + // pass the message to the user - essentially something about + // the table + // information passed to HCatOutputFormat was not right + throw new PigException(he.getMessage(), + PigHCatUtil.PIG_EXCEPTION_CODE, he); + } + HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job); + try { + doSchemaValidations(pigSchema, hcatTblSchema); + } catch (HCatException he) { + throw new FrontendException(he.getMessage(), + PigHCatUtil.PIG_EXCEPTION_CODE, he); + } + computedSchema = convertPigSchemaToHCatSchema(pigSchema, + hcatTblSchema); + HCatOutputFormat.setSchema(job, computedSchema); + udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); + // We will store all the new /changed properties in the job in the + // udf context, so the the HCatOutputFormat.setOutput and setSchema + // methods need not be called many times. + for ( Entry keyValue : job.getConfiguration()) { + String oldValue = clone.getConfiguration().get(keyValue.getKey()); + if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) { + udfProps.put(keyValue.getKey(), keyValue.getValue()); + } - Configuration config = job.getConfiguration(); - if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){ - - Schema schema = (Schema)ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA)); - if(schema != null){ - pigSchema = schema; - } - if(pigSchema == null){ - throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE); - } - try{ - HCatOutputFormat.setOutput(job, outputJobInfo); - } catch(HCatException he) { - // pass the message to the user - essentially something about the table - // information passed to HCatOutputFormat was not right - throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he); - } - HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job); - try{ - doSchemaValidations(pigSchema, hcatTblSchema); - } catch(HCatException he){ - throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he); - } - computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema); - HCatOutputFormat.setSchema(job, computedSchema); - p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF); - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID); - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); - PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO); - - p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); - - }else{ - config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - - PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF); - PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID); - PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); - PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); - PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); - - } + } + //Store credentials in a private hash map and not the udf context to + // make sure they are not public. + jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign,job.getCredentials()); + udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true); + } } - @Override public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1312006) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -28,15 +28,17 @@ public static final String SEQUENCEFILE_INPUT = SequenceFileInputFormat.class.getName(); public static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName(); - + public static final String HCAT_PIG_STORAGE_CLASS = "org.apache.pig.builtin.PigStorage"; public static final String HCAT_PIG_LOADER = "hcat.pig.loader"; + public static final String HCAT_PIG_LOADER_LOCATION_SET = HCAT_PIG_LOADER + ".location.set" ; public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args"; public static final String HCAT_PIG_STORER = "hcat.pig.storer"; public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args"; public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter"; public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ","; - + public static final String HCAT_PIG_STORER_LOCATION_SET = HCAT_PIG_STORER + ".location.set" ; + //The keys used to store info into the job Configuration public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat"; @@ -59,7 +61,7 @@ public static final String HCAT_CREATE_DB_NAME = "hcat.create.db.name"; - public static final String HCAT_METASTORE_PRINCIPAL + public static final String HCAT_METASTORE_PRINCIPAL = HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname; // IMPORTANT IMPORTANT IMPORTANT!!!!! @@ -86,11 +88,11 @@ public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; - + public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix"; - + public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid"; public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; @@ -107,7 +109,7 @@ // System environment variables public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"; - + // Hadoop Conf Var Names public static final String CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";