diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 34359c1..176c4a4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1260,14 +1260,14 @@ HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true, ""), - HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "minimal", new StringSet("minimal", "more"), + HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "more", new StringSet("minimal", "more"), "Some select queries can be converted to single FETCH task minimizing latency.\n" + "Currently the query should be single sourced not having any subquery and should not have\n" + "any aggregations or distincts (which incurs RS), lateral views and joins.\n" + "1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only\n" + "2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)\n" ), - HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold", -1l, + HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold", 1073741824L, "Input threshold for applying hive.fetch.task.conversion. If target table is native, input length\n" + "is calculated by summation of file lengths. If it's not native, storage handler for the table\n" + "can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface."), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 653f5cc..8dc5c15 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2219,7 +2219,7 @@ hive.fetch.task.conversion - minimal + more Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have @@ -2230,7 +2230,7 @@ hive.fetch.task.conversion.threshold - -1 + 1073741824 Input threshold for applying hive.fetch.task.conversion. If target table is native, input length is calculated by summation of file lengths. If it's not native, storage handler for the table diff --git data/conf/hive-site.xml data/conf/hive-site.xml index 37ac8c0..fe8080a 100644 --- data/conf/hive-site.xml +++ data/conf/hive-site.xml @@ -235,5 +235,9 @@ + + hive.fetch.task.conversion + minimal + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 7413d2b..c856623 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -27,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FetchTask; @@ -106,9 +108,9 @@ private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator so pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION); boolean aggressive = "more".equals(mode); + final int limit = pctx.getQB().getParseInfo().getOuterQueryLimit(); FetchData fetch = checkTree(aggressive, pctx, alias, source); - if (fetch != null && checkThreshold(fetch, pctx)) { - int limit = pctx.getQB().getParseInfo().getOuterQueryLimit(); + if (fetch != null && checkThreshold(fetch, limit, pctx)) { FetchWork fetchWork = fetch.convertToWork(); FetchTask fetchTask = (FetchTask) TaskFactory.get(fetchWork, pctx.getConf()); fetchWork.setSink(fetch.completed(pctx, fetchWork)); @@ -119,7 +121,10 @@ private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator so return null; } - private boolean checkThreshold(FetchData data, ParseContext pctx) throws Exception { + private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) throws Exception { + if (limit > 0 && data.hasOnlyPruningFilter()) { + return true; + } long threshold = HiveConf.getLongVar(pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD); if (threshold < 0) { @@ -169,7 +174,7 @@ private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias, PrunedPartitionList pruned = pctx.getPrunedPartitions(alias, ts); if (aggressive || !pruned.hasUnknownPartitions()) { bypassFilter &= !pruned.hasUnknownPartitions(); - return checkOperators(new FetchData(parent, table, pruned, splitSample), ts, + return checkOperators(new FetchData(parent, table, pruned, splitSample, bypassFilter), ts, aggressive, bypassFilter); } } @@ -211,6 +216,7 @@ private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean private final SplitSample splitSample; private final PrunedPartitionList partsList; private final HashSet inputs = new HashSet(); + private final boolean onlyPruningFilter; // source table scan private TableScanOperator scanOp; @@ -223,14 +229,23 @@ private FetchData(ReadEntity parent, Table table, SplitSample splitSample) { this.table = table; this.partsList = null; this.splitSample = splitSample; + this.onlyPruningFilter = false; } private FetchData(ReadEntity parent, Table table, PrunedPartitionList partsList, - SplitSample splitSample) { + SplitSample splitSample, boolean bypassFilter) { this.parent = parent; this.table = table; this.partsList = partsList; this.splitSample = splitSample; + this.onlyPruningFilter = bypassFilter; + } + + /* + * all filters were executed during partition pruning + */ + public boolean hasOnlyPruningFilter() { + return this.onlyPruningFilter; } private FetchWork convertToWork() throws HiveException { @@ -317,7 +332,12 @@ private long getFileLength(JobConf conf, Path path, Class InputFormat input = HiveInputFormat.getInputFormatFromCache(clazz, conf); summary = ((ContentSummaryInputFormat)input).getContentSummary(path, conf); } else { - summary = path.getFileSystem(conf).getContentSummary(path); + FileSystem fs = path.getFileSystem(conf); + try { + summary = fs.getContentSummary(path); + } catch (FileNotFoundException e) { + return 0; + } } return summary.getLength(); } diff --git ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q index e6343e2..b1a7cb5 100644 --- ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q +++ ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q @@ -5,5 +5,6 @@ explain select cast(key as int) * 10, upper(value) from src limit 10; set hive.fetch.task.conversion.threshold=100; +-- from HIVE-7397, limit + partition pruning filter explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10; explain select cast(key as int) * 10, upper(value) from src limit 10; diff --git ql/src/test/results/clientpositive/nonmr_fetch_threshold.q.out ql/src/test/results/clientpositive/nonmr_fetch_threshold.q.out index 39cdfa6..6d5ea34 100644 --- ql/src/test/results/clientpositive/nonmr_fetch_threshold.q.out +++ ql/src/test/results/clientpositive/nonmr_fetch_threshold.q.out @@ -46,41 +46,31 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE ListSink -PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10 +PREHOOK: query: -- from HIVE-7397, limit + partition pruning filter +explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10 PREHOOK: type: QUERY -POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10 +POSTHOOK: query: -- from HIVE-7397, limit + partition pruning filter +explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10 POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-0 is a root stage STAGE PLANS: - Stage: Stage-1 - Map Reduce - Map Operator Tree: - TableScan - alias: srcpart - Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE - Limit - Number of rows: 10 - Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-0 Fetch Operator limit: 10 Processor Tree: - ListSink + TableScan + alias: srcpart + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE + ListSink PREHOOK: query: explain select cast(key as int) * 10, upper(value) from src limit 10 PREHOOK: type: QUERY diff --git ql/src/test/results/clientpositive/tez/bucket2.q.out ql/src/test/results/clientpositive/tez/bucket2.q.out index c953757..66eba00 100644 --- ql/src/test/results/clientpositive/tez/bucket2.q.out +++ ql/src/test/results/clientpositive/tez/bucket2.q.out @@ -199,39 +199,24 @@ POSTHOOK: query: explain select * from bucket2_1 tablesample (bucket 1 out of 2) s POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-0 is a root stage STAGE PLANS: - Stage: Stage-1 - Tez -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: s - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: int), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: - ListSink + TableScan + alias: s + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + ListSink PREHOOK: query: select * from bucket2_1 tablesample (bucket 1 out of 2) s PREHOOK: type: QUERY diff --git ql/src/test/results/clientpositive/tez/bucket3.q.out ql/src/test/results/clientpositive/tez/bucket3.q.out index 8507c7d..41c680e 100644 --- ql/src/test/results/clientpositive/tez/bucket3.q.out +++ ql/src/test/results/clientpositive/tez/bucket3.q.out @@ -222,39 +222,24 @@ POSTHOOK: query: explain select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-0 is a root stage STAGE PLANS: - Stage: Stage-1 - Tez -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: s - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: int), value (type: string), ds (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: - ListSink + TableScan + alias: s + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string), ds (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + ListSink PREHOOK: query: select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' PREHOOK: type: QUERY diff --git ql/src/test/results/clientpositive/tez/bucket4.q.out ql/src/test/results/clientpositive/tez/bucket4.q.out index 68788eb..28544f2 100644 --- ql/src/test/results/clientpositive/tez/bucket4.q.out +++ ql/src/test/results/clientpositive/tez/bucket4.q.out @@ -198,39 +198,24 @@ POSTHOOK: query: explain select * from bucket4_1 tablesample (bucket 1 out of 2) s POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-0 is a root stage STAGE PLANS: - Stage: Stage-1 - Tez -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: s - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: int), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: - ListSink + TableScan + alias: s + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((hash(key) & 2147483647) % 2) = 0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + ListSink PREHOOK: query: select * from bucket4_1 tablesample (bucket 1 out of 2) s PREHOOK: type: QUERY