diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java index 2f07be1..06cecfa 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java @@ -141,6 +141,8 @@ private static InputJobInfo getInputJobInfo( } inputJobInfo.setPartitions(partInfoList); + SpecialCases.addSpecialCasesParametersToInputJobProperties(conf, inputJobInfo, table.getInputFormatClass()); + return inputJobInfo; } finally { HCatUtil.closeHiveClientQuietly(client); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java index 3100181..74e3b1c 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java @@ -173,6 +173,7 @@ private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) //add props from params set in table schema props.putAll(info.getStorerInfo().getProperties()); + props.putAll(info.getTable().getParameters()); return props; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java index 60af5c0..04d8124 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java @@ -22,22 +22,32 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat; import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; import org.apache.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.avro.StrictAvroSerDe; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer; + /** * This class is a place to put all the code associated with * Special cases. If there is a corner case required to make @@ -56,6 +66,43 @@ static final private Logger LOG = LoggerFactory.getLogger(SpecialCases.class); + public static void addSpecialCasesParametersToInputJobProperties( + Configuration conf, InputJobInfo jobInfo, Class ifClass) throws Exception { + + LOG.debug("SpecialCases::addSpecialCasesParametersToInputJobProperties()."); + // Prefetch Avro table-schema from Schema URL. + if (ifClass == AvroContainerInputFormat.class) { + Table table = jobInfo.getTableInfo().getTable(); + LOG.debug("Avro table found: " + table.getDbName() + "." + table.getTableName()); + Deserializer deserializer = MetaStoreUtils.getDeserializer(conf, table, false); + assert (deserializer != null && (deserializer.getClass().equals(AvroSerDe.class) + || deserializer.getClass().equals(StrictAvroSerDe.class))); + + LOG.debug("Deserializer class: " + deserializer.getClass().getCanonicalName()); + Map tableParameters = table.getParameters(); + Properties tableProperties = new Properties(); + tableProperties.putAll(tableParameters); + + try { + deserializer.initialize(conf, tableProperties); // Should set the LITERAL, from the URL. + LOG.debug("SerDe init succeeded for class: " + deserializer.getClass().getCanonicalName()); + for (Map.Entry property : tableProperties.entrySet()) { + if (!property.getValue().equals(tableParameters.get((String)property.getKey()))) { + LOG.debug("Resolving changed parameters! key=" + property.getKey() + ", value=" + property.getValue()); + tableParameters.put((String)property.getKey(), (String)property.getValue()); + } + else { + LOG.debug("Skipping resolution for : " + property.getKey()); + } + } + } + catch(Throwable t) { + LOG.error("SerDe initialization failed for: " + deserializer.getClass()); + } + // jobProperties now contains the table-properties. + } + } + /** * Method to do any file-format specific special casing while * instantiating a storage handler to write. We set any parameters