diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 301159e..ef5ed6f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1595,6 +1595,16 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" + "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."), + HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE("hive.optimize.update.table.properties.from.serde", false, + "Whether to update table-properties by initializing tables' SerDe instances during logical-optimization. \n" + + "By doing so, certain SerDe classes (like AvroSerDe) can pre-calculate table-specific information, and \n" + + "store it in table-properties, to be used later in the SerDe, while running the job."), + + HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST("hive.optimize.update.table.properties.from.serde.list", + "org.apache.hadoop.hive.serde2.avro.AvroSerDe", + "The comma-separated list of SerDe classes that are considered when enhancing table-properties \n" + + "during logical optimization."), + // CTE HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1, "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index eaf0abc..aee0293 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -229,6 +229,10 @@ public void initialize(HiveConf hiveConf) { transformations.add(new SimpleFetchAggregation()); } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE)) { + transformations.add(new TablePropertyEnrichmentOptimizer()); + } + } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java new file mode 100644 index 0000000..98acb0d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java @@ -0,0 +1,125 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.Deserializer; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Stack; + +/** + * Optimizer that updates TableScanOperators' Table-references with properties that might be + * updated/pre-fetched by initializing the table's SerDe. + * E.g. AvroSerDes can now prefetch schemas from schema-urls and update the table-properties directly. + */ +class TablePropertyEnrichmentOptimizer extends Transform { + + private static Log LOG = LogFactory.getLog(TablePropertyEnrichmentOptimizer.class); + + private static class WalkerCtx implements NodeProcessorCtx { + + Configuration conf; + Set serdeClassesUnderConsideration = Sets.newHashSet(); + + WalkerCtx(Configuration conf) { + this.conf = conf; + serdeClassesUnderConsideration.addAll( + Arrays.asList( HiveConf.getVar(conf, + HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST) + .split(","))); + + if (LOG.isDebugEnabled()) { + LOG.debug("TablePropertyEnrichmentOptimizer considers these SerDe classes:"); + for (String className : serdeClassesUnderConsideration) { + LOG.debug(className); + } + } + } + } + + private static class Processor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + TableScanOperator tsOp = (TableScanOperator) nd; + WalkerCtx context = (WalkerCtx)procCtx; + + TableScanDesc tableScanDesc = tsOp.getConf(); + Table table = tsOp.getConf().getTableMetadata().getTTable(); + Map tableParameters = table.getParameters(); + Properties tableProperties = new Properties(); + tableProperties.putAll(tableParameters); + + Deserializer deserializer = tableScanDesc.getTableMetadata().getDeserializer(); + String deserializerClassName = deserializer.getClass().getName(); + try { + if (context.serdeClassesUnderConsideration.contains(deserializerClassName)) { + deserializer.initialize(context.conf, tableProperties); + LOG.debug("SerDe init succeeded for class: " + deserializerClassName); + for (Map.Entry property : tableProperties.entrySet()) { + if (!property.getValue().equals(tableParameters.get(property.getKey()))) { + LOG.debug("Resolving changed parameters! key=" + property.getKey() + ", value=" + property.getValue()); + tableParameters.put((String) property.getKey(), (String) property.getValue()); + } + } + } + else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping prefetch for " + deserializerClassName); + } + } + } + catch(Throwable t) { + LOG.error("SerDe init failed for SerDe class==" + deserializerClassName + + ". Didn't change table-properties", t); + } + + return nd; + } + } + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + LOG.info("TablePropertyEnrichmentOptimizer::transform()."); + + Map opRules = Maps.newLinkedHashMap(); + opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"), + new Processor()); + + WalkerCtx context = new WalkerCtx(pctx.getConf()); + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context); + + List topNodes = Lists.newArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + + GraphWalker walker = new PreOrderWalker(disp); + walker.startWalking(topNodes, null); + + LOG.info("TablePropertyEnrichmentOptimizer::transform() complete!"); + return pctx; + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 0be54e0..2e970e3 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -88,6 +88,9 @@ public void initialize(Configuration configuration, Properties properties) throw LOG.debug("Resetting already initialized AvroSerDe"); } + LOG.info("AvroSerde::initialize(): Preset value of avro.schema.literal == " + + properties.get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())); + schema = null; oi = null; columnNames = null; @@ -107,9 +110,10 @@ public void initialize(Configuration configuration, Properties properties) throw columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); schema = getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty); - properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); } + properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); + if (LOG.isDebugEnabled()) { LOG.debug("Avro schema is " + schema); }