From 9e74420f2c350a453f5cb3e60d6d1d0a89a1a84a Mon Sep 17 00:00:00 2001 From: Syed Albiz Date: Wed, 10 Aug 2011 01:14:01 -0700 Subject: [PATCH 1/1] yucky sem analyzer peppering diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b46976f..3c65e18 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -367,6 +367,7 @@ public class HiveConf extends Configuration { // Optimizer HIVEOPTCP("hive.optimize.cp", true), // column pruner HIVEOPTINDEXFILTER("hive.optimize.index.filter", false), // automatically use indexes + HIVEINDEXAUTOUPDATE("hive.optimize.index.autoupdate", true), //automatically update stale indexes HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown HIVEPPDREMOVEDUPLICATEFILTERS("hive.ppd.remove.duplicatefilters", true), // push predicates down to storage handlers diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java new file mode 100644 index 0000000..0abb891 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.index.HiveIndexHandler; +import org.apache.hadoop.hive.ql.index.HiveIndexQueryContext; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; + +public class IndexUpdater { + private List loadTableWork; + private HiveConf conf; + private Hive hive; + private List> tasks; + private ParseContext pctx; + + + public IndexUpdater(List loadTableWork, ParseContext pctx) { + this.loadTableWork = loadTableWork; + this.pctx = pctx; + this.conf = new HiveConf(pctx.getConf(), IndexUpdater.class); + this.tasks = new LinkedList>(); + } + + public List> generateUpdateTasks() throws + HiveException { + hive = Hive.get(this.conf); + for (LoadTableDesc ltd : loadTableWork) { + TableDesc td = ltd.getTable(); + Table srcTable = hive.getTable(td.getTableName()); + List tblIndexes = srcTable.getAllIndexes((short)-1); + Map partSpec = ltd.getPartitionSpec(); + if (partSpec == null || partSpec.size() == 0) { + //unpartitioned table, update whole index + doIndexUpdate(tblIndexes); + } else { + doIndexUpdate(tblIndexes, partSpec); + } + } + return tasks; + } + + private void doIndexUpdate(List tblIndexes) throws HiveException { + Driver driver = new Driver(this.conf); + for (Index idx : tblIndexes) { + StringBuilder sb = new StringBuilder(); + sb.append("ALTER INDEX "); + sb.append(idx.getIndexName()); + sb.append(" ON "); + sb.append(idx.getOrigTableName()); + sb.append(" REBUILD"); + driver.compile(sb.toString()); + tasks.addAll(driver.getPlan().getRootTasks()); + Set inputs = pctx.getSemanticInputs(); + inputs.addAll(driver.getPlan().getInputs()); + } + } + + private void doIndexUpdate(List tblIndexes, Map + partSpec) throws HiveException { + for (Index index : tblIndexes) { + if (containsPartition(index, partSpec)) { + doIndexUpdate(index, partSpec); + } + } + } + + private void doIndexUpdate(Index index, Map partSpec) throws + HiveException { + StringBuilder ps = new StringBuilder(); + boolean first = true; + ps.append("("); + for (String key : partSpec.keySet()) { + if (!first) { + ps.append(", "); + } else { + first = false; + } + ps.append(key); + ps.append("="); + ps.append(partSpec.get(key)); + } + ps.append(")"); + StringBuilder sb = new StringBuilder(); + sb.append("ALTER INDEX "); + sb.append(index.getIndexName()); + sb.append(" ON "); + sb.append(index.getOrigTableName()); + sb.append(" PARTITION "); + sb.append(ps.toString()); + sb.append(" REBUILD"); + Driver driver = new Driver(this.conf); + driver.compile(sb.toString(), false); + tasks.addAll(driver.getPlan().getRootTasks()); + Set inputs = pctx.getSemanticInputs(); + inputs.addAll(driver.getPlan().getInputs()); + } + + + private boolean containsPartition(Index index, Map partSpec) + throws HiveException { + Table indexTable = hive.getTable(index.getIndexTableName()); + List parts = hive.getPartitions(indexTable, partSpec); + return (parts == null || parts.size() == 0); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cec0d46..fd320d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6828,8 +6828,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); + // Check to see if we are stale'ing any indexes and auto-update them if we want + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getParseContext()); + try { + List> indexUpdateTasks = indexUpdater.generateUpdateTasks(); + for (Task updateTask : indexUpdateTasks) { + tsk.addDependentTask(updateTask); + } + } catch (HiveException e) { + console.printInfo("WARNING: could not auto-update stale indexes, indexes are not out of sync"); + } + } } + boolean oneLoadFile = true; for (LoadFileDesc lfd : loadFileWork) { if (qb.isCTAS()) { diff --git ql/src/test/queries/clientpositive/index_auto_update.q ql/src/test/queries/clientpositive/index_auto_update.q new file mode 100644 index 0000000..35490dc --- /dev/null +++ ql/src/test/queries/clientpositive/index_auto_update.q @@ -0,0 +1,24 @@ +-- Test if index is actually being used. + +-- Create temp, and populate it with some values in src. +CREATE TABLE temp(key STRING, val STRING) STORED AS TEXTFILE; +INSERT OVERWRITE TABLE temp SELECT * FROM src WHERE key < 50; + +-- Build an index on temp. +CREATE INDEX temp_index ON TABLE temp(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX temp_index ON temp REBUILD; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET hive.optimize.index.filter=true; +SET hive.optimize.index.filter.compact.minsize=0; + +-- overwrite temp table so index is out of date +INSERT OVERWRITE TABLE temp SELECT * FROM src; + +-- query should return indexed values +EXPLAIN SELECT * FROM temp WHERE key = 86; +SELECT * FROM temp WHERE key = 86; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET hive.optimize.index.filter=false; +DROP table temp; diff --git ql/src/test/results/clientpositive/index_auto_update.q.out ql/src/test/results/clientpositive/index_auto_update.q.out new file mode 100644 index 0000000..483c977 --- /dev/null +++ ql/src/test/results/clientpositive/index_auto_update.q.out @@ -0,0 +1,209 @@ +PREHOOK: query: -- Test if index is actually being used. + +-- Create temp, and populate it with some values in src. +CREATE TABLE temp(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- Test if index is actually being used. + +-- Create temp, and populate it with some values in src. +CREATE TABLE temp(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@temp +PREHOOK: query: INSERT OVERWRITE TABLE temp SELECT * FROM src WHERE key < 50 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@temp +POSTHOOK: query: INSERT OVERWRITE TABLE temp SELECT * FROM src WHERE key < 50 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@temp +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- Build an index on temp. +CREATE INDEX temp_index ON TABLE temp(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: -- Build an index on temp. +CREATE INDEX temp_index ON TABLE temp(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: ALTER INDEX temp_index ON temp REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@temp +PREHOOK: Output: default@default__temp_temp_index__ +POSTHOOK: query: ALTER INDEX temp_index ON temp REBUILD +POSTHOOK: type: ALTERINDEX_REBUILD +POSTHOOK: Input: default@temp +POSTHOOK: Output: default@default__temp_temp_index__ +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- overwrite temp table so index is out of date +INSERT OVERWRITE TABLE temp SELECT * FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@temp +PREHOOK: Output: default@temp +POSTHOOK: query: -- overwrite temp table so index is out of date +INSERT OVERWRITE TABLE temp SELECT * FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@temp +POSTHOOK: Output: default@temp +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- query should return indexed values +EXPLAIN SELECT * FROM temp WHERE key = 86 +PREHOOK: type: QUERY +POSTHOOK: query: -- query should return indexed values +EXPLAIN SELECT * FROM temp WHERE key = 86 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME temp))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 86)))) + +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-6 depends on stages: Stage-3 , consists of Stage-5, Stage-4 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4 + Stage-1 depends on stages: Stage-2 + Stage-4 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-3 + Map Reduce + Alias -> Map Operator Tree: + default__temp_temp_index__ + TableScan + alias: default__temp_temp_index__ + filterExpr: + expr: (key = 86) + type: boolean + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Select Operator + expressions: + expr: _bucketname + type: string + expr: _offsets + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-6 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true + destination: file:/Users/salbiz/dev/hive/build/ql/scratchdir/hive_2011-08-10_00-36-37_964_8643203667590642810/-ext-10000 + + Stage: Stage-2 + Move Operator + files: + hdfs directory: true + destination: file:/var/folders/5V/5V4Zq77qGD4aSK9m8V3frVsFdRU/-Tmp-/salbiz/hive_2011-08-10_00-36-37_813_1250616336923583381/-mr-10002 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + temp + TableScan + alias: temp + filterExpr: + expr: (key = 86) + type: boolean + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: + file:/Users/salbiz/dev/hive/build/ql/scratchdir/hive_2011-08-10_00-36-37_964_8643203667590642810/-ext-10001 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT * FROM temp WHERE key = 86 +PREHOOK: type: QUERY +PREHOOK: Input: default@default__temp_temp_index__ +PREHOOK: Input: default@temp +PREHOOK: Output: file:/var/folders/5V/5V4Zq77qGD4aSK9m8V3frVsFdRU/-Tmp-/salbiz/hive_2011-08-10_00-36-38_074_529064073417161421/-mr-10000 +POSTHOOK: query: SELECT * FROM temp WHERE key = 86 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__temp_temp_index__ +POSTHOOK: Input: default@temp +POSTHOOK: Output: file:/var/folders/5V/5V4Zq77qGD4aSK9m8V3frVsFdRU/-Tmp-/salbiz/hive_2011-08-10_00-36-38_074_529064073417161421/-mr-10000 +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +86 val_86 +PREHOOK: query: DROP table temp +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@temp +PREHOOK: Output: default@temp +POSTHOOK: query: DROP table temp +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@temp +POSTHOOK: Output: default@temp +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._bucketname SIMPLE [(temp)temp.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__._offsets EXPRESSION [(temp)temp.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: default__temp_temp_index__.key SIMPLE [(temp)temp.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: temp.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: temp.val SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] -- 1.7.4.4