diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 107faf7c84..4e2e8f89f2 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -33,7 +33,11 @@ import java.util.Map; import java.util.Properties; +import javax.security.auth.login.LoginException; + +import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -76,13 +80,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; - public class HCatUtil { private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class); + private static final HashMap hiveConfDefaults; private static volatile HiveClientCache hiveClientCache; + static { + // Load all of the default confing values from HiveConf. + hiveConfDefaults = new HashMap(HiveConf.ConfVars.values().length); + for (HiveConf.ConfVars var : HiveConf.ConfVars.values()) { + hiveConfDefaults.put(var.toString(), var.getDefaultValue()); + } + } + public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("pig.job.converted.fetch", "").equals("") && j.getConfiguration().get("mapred.task.id", "").equals("") && @@ -588,6 +599,47 @@ public static void closeHiveClientQuietly(IMetaStoreClient client) { } } + private static Configuration getHiveSiteContentsFromClasspath() { + Configuration configuration = new Configuration(false); // Don't load defaults. + configuration.addResource("hive-site.xml"); // NOTE: hive-site.xml is only available on client, not AM. + return configuration; + } + + private static Properties getHiveSiteOverrides(Configuration jobConf) { + return getHiveSiteOverrides(getHiveSiteContentsFromClasspath(), jobConf); + } + + /** + * Returns the hive-site.xml config settings which do not appear in jobConf or + * the hive-site.xml config settings which appear in jobConf, but have a + * different value than HiveConf code defaults. + * @param hiveConf the config settings as found in the hive-site.xml only. + * @param jobConf the config settings used to launch the job. + * @return the set difference between hiveConf and jobConf. + */ + private static Properties getHiveSiteOverrides(Configuration hiveConf, Configuration jobConf) { + // return (hiveConf - jobConf); + Properties difference = new Properties(); + for (Map.Entry keyValue : hiveConf) { + String key = keyValue.getKey(); + String hiveSiteValue = keyValue.getValue(); + String configValue = jobConf.getRaw(key); + + if (configValue == null) { + difference.put(key, hiveSiteValue); + } else if (hiveConfDefaults.containsKey(key)) { + // Necessary to compare against HiveConf defaults as hive-site.xml is not available on task nodes (like AM). + if (! configValue.equals(hiveConfDefaults.get(key))) { + difference.put(key, configValue); + } + } + } + + LOG.info("Configuration differences=" + difference); + + return difference; + } + public static HiveConf getHiveConf(Configuration conf) throws IOException { @@ -595,30 +647,23 @@ public static HiveConf getHiveConf(Configuration conf) //copy the hive conf into the job conf and restore it //in the backend context - if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) { - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, - HCatUtil.serialize(hiveConf.getAllProperties())); + if (StringUtils.isBlank(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF))) { + // Called once on the client. + LOG.info(HCatConstants.HCAT_KEY_HIVE_CONF + " not set. Generating configuration differences."); + + Properties differences = getHiveSiteOverrides(conf); + + // Must set this key even if differences is empty otherwise client and AM will attempt + // to set this multiple times. + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(differences)); } else { - //Copy configuration properties into the hive conf + // Called one or more times on the client and AM. + LOG.info(HCatConstants.HCAT_KEY_HIVE_CONF + " is set. Applying configuration differences."); + Properties properties = (Properties) HCatUtil.deserialize( - conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); - - for (Map.Entry prop : properties.entrySet()) { - if (prop.getValue() instanceof String) { - hiveConf.set((String) prop.getKey(), (String) prop.getValue()); - } else if (prop.getValue() instanceof Integer) { - hiveConf.setInt((String) prop.getKey(), - (Integer) prop.getValue()); - } else if (prop.getValue() instanceof Boolean) { - hiveConf.setBoolean((String) prop.getKey(), - (Boolean) prop.getValue()); - } else if (prop.getValue() instanceof Long) { - hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); - } else if (prop.getValue() instanceof Float) { - hiveConf.setFloat((String) prop.getKey(), - (Float) prop.getValue()); - } - } + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + storePropertiesToHiveConf(properties, hiveConf); } if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { @@ -629,6 +674,25 @@ public static HiveConf getHiveConf(Configuration conf) return hiveConf; } + public static HiveConf storePropertiesToHiveConf(Properties properties, HiveConf hiveConf) + throws IOException { + for (Map.Entry prop : properties.entrySet()) { + if (prop.getValue() instanceof String) { + hiveConf.set((String) prop.getKey(), (String) prop.getValue()); + } else if (prop.getValue() instanceof Integer) { + hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue()); + } else if (prop.getValue() instanceof Boolean) { + hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue()); + } else if (prop.getValue() instanceof Long) { + hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); + } else if (prop.getValue() instanceof Float) { + hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue()); + } else { + LOG.warn("Unsupported type: key=" + prop.getKey() + " value=" + prop.getValue()); + } + } + return hiveConf; + } public static JobConf getJobConfFromContext(JobContext jobContext) { JobConf jobConf; @@ -642,6 +706,30 @@ public static JobConf getJobConfFromContext(JobContext jobContext) { return jobConf; } + // Retrieve settings in HiveConf that aren't also set in the JobConf. + public static Map getHCatKeyHiveConf(JobConf conf) { + try { + Properties properties = null; + + if (! StringUtils.isBlank(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF))) { + properties = (Properties) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + LOG.info(HCatConstants.HCAT_KEY_HIVE_CONF + " is set. Using differences=" + properties); + } else { + LOG.info(HCatConstants.HCAT_KEY_HIVE_CONF + " not set. Generating configuration differences."); + + properties = getHiveSiteOverrides(conf); + } + + // This method may not be safe as it can throw an NPE if a key or value is null. + return Maps.fromProperties(properties); + } + catch (IOException e) { + throw new IllegalStateException("Failed to deserialize hive conf", e); + } + } + public static void copyJobPropertiesToJobConf( Map jobProperties, JobConf jobConf) { for (Map.Entry entry : jobProperties.entrySet()) { @@ -649,7 +737,6 @@ public static void copyJobPropertiesToJobConf( } } - public static boolean isHadoop23() { String version = org.apache.hadoop.util.VersionInfo.getVersion(); if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*")) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 9caff7a97d..ec1e1ca1bb 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -126,16 +126,20 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) } HiveStorageHandler storageHandler; - JobConf jobConf; + Map hiveProps = null; //For each matching partition, call getSplits on the underlying InputFormat for (PartInfo partitionInfo : partitionInfoList) { - jobConf = HCatUtil.getJobConfFromContext(jobContext); + JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); + if (hiveProps == null) { + hiveProps = HCatUtil.getHCatKeyHiveConf(jobConf); + } List setInputPath = setInputPath(jobConf, partitionInfo.getLocation()); if (setInputPath.isEmpty()) { continue; } Map jobProperties = partitionInfo.getJobProperties(); + HCatUtil.copyJobPropertiesToJobConf(hiveProps, jobConf); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); storageHandler = HCatUtil.getStorageHandler(