diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f3b01b2..8fa9c0c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3190,6 +3190,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false, + "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated" + + "with TableScan operator on the root of operator tree, instead of using operator statistics."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 2c53047..c107095 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1480,7 +1480,8 @@ spark.query.files=add_part_multiple.q, \ spark.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ - spark_vectorized_dynamic_partition_pruning.q + spark_vectorized_dynamic_partition_pruning.q,\ + spark_use_file_size_for_mapjoin.q miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index d294e25..5bbfe12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -346,4 +346,38 @@ public static void setMemoryAvailable(final List op, Collection> roots) { + List> parents = op.getParentOperators(); + if (parents == null || parents.isEmpty()) { + roots.add(op); + return; + } + for (Operator p : parents) { + findRoots(p, roots); + } + } + + /** + * Remove the branch that contains the specified operator. Do nothing if there's no branching, + * i.e. all the upstream operators have only one child. + */ + public static void removeBranch(Operator op) { + Operator child = op; + Operator curr = op; + + while (curr.getChildOperators().size() <= 1) { + child = curr; + if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { + return; + } + curr = curr.getParentOperators().get(0); + } + + curr.removeChild(child); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java index c41a0c8..26a1088 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java @@ -20,6 +20,7 @@ import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -28,7 +29,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; @@ -54,7 +54,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (desc.getStatistics().getDataSize() > context.getConf() .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { - GenSparkUtils.removeBranch(op); + OperatorUtils.removeBranch(op); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + desc.getTableScan().getName() diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 7faff88..d8f37ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -191,12 +193,40 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi int pos = 0; // bigTableFound means we've encountered a table that's bigger than the - // max. This table is either the the big table or we cannot convert. + // max. This table is either the big table or we cannot convert. boolean bigTableFound = false; + boolean useTsStats = context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_FILE_SIZE_FOR_MAPJOIN.varname, false); + boolean hasUpstreamSinks = false; + // Check whether there's any upstream RS. + // If so, don't use TS stats because they could be inaccurate. for (Operator parentOp : joinOp.getParentOperators()) { + Set parentSinks = + OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class); + parentSinks.remove(parentOp); + if (!parentSinks.isEmpty()) { + hasUpstreamSinks = true; + } + } + + // If we are using TS stats and this JOIN has at least one upstream RS, disable MapJoin conversion. + if (useTsStats && hasUpstreamSinks) { + return new long[]{-1, 0, 0}; + } + + for (Operator parentOp : joinOp.getParentOperators()) { + Statistics currInputStat; + if (useTsStats) { + currInputStat = new Statistics(); + // Find all root TSs and add up all data sizes + // Not adding other stats (e.g., # of rows, col stats) since only data size is used here + for (TableScanOperator root : OperatorUtils.findOperatorsUpstream(parentOp, TableScanOperator.class)) { + currInputStat.addToDataSize(root.getStatistics().getDataSize()); + } + } else { + currInputStat = parentOp.getStatistics(); + } - Statistics currInputStat = parentOp.getStatistics(); if (currInputStat == null) { LOG.warn("Couldn't get statistics from: " + parentOp); return new long[]{-1, 0, 0}; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 7b2b3c0..36bde30 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; @@ -573,7 +574,7 @@ private static boolean hasGBYOperator(ReduceSinkOperator rs) { */ public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { List> ops = new ArrayList>(); - findRoots(op, ops); + OperatorUtils.findRoots(op, ops); for (Operator r : ops) { BaseWork work = procCtx.rootToWorkMap.get(r); if (work != null) { @@ -582,37 +583,4 @@ public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { } return null; } - - /* - * findRoots returns all root operators (in ops) that result in operator op - */ - private void findRoots(Operator op, List> ops) { - List> parents = op.getParentOperators(); - if (parents == null || parents.isEmpty()) { - ops.add(op); - return; - } - for (Operator p : parents) { - findRoots(p, ops); - } - } - - /** - * Remove the branch that contains the specified operator. Do nothing if there's no branching, - * i.e. all the upstream operators have only one child. - */ - public static void removeBranch(Operator op) { - Operator child = op; - Operator curr = op; - - while (curr.getChildOperators().size() <= 1) { - child = curr; - if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { - return; - } - curr = curr.getParentOperators().get(0); - } - - curr.removeChild(child); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 71528e8..c4b1640 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -169,7 +170,7 @@ private void removeDPPOperator(Set> component, OptimizeSparkProcCont return; } - GenSparkUtils.removeBranch(toRemove); + OperatorUtils.removeBranch(toRemove); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency"); diff --git ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q new file mode 100644 index 0000000..b623b83 --- /dev/null +++ ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q @@ -0,0 +1,30 @@ +set hive.mapred.mode=nonstrict; +set hive.auto.convert.join=true; +set hive.spark.use.file.size.for.mapjoin=true; +set hive.auto.convert.join.noconditionaltask.size=4000; + +EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97; + +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97; + +set hive.auto.convert.join.noconditionaltask.size=8000; + +-- This is copied from auto_join2. Without the configuration both joins are mapjoins, +-- but with the configuration on, Hive should not turn the second join into mapjoin since it +-- has a upstream reduce sink. + +CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE; + +EXPLAIN +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value; + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value; + +SELECT sum(hash(dest.key,dest.value)) FROM dest; diff --git ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out new file mode 100644 index 0000000..9044140 --- /dev/null +++ ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out @@ -0,0 +1,257 @@ +PREHOOK: query: EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 97.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 97.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col2 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +97 val_97 +97 val_97 +97 val_97 +97 val_97 +PREHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest +POSTHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest +PREHOOK: query: EXPLAIN +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + input vertices: + 1 Map 3 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double) + sort order: + + Map-reduce partition columns: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Local Work: + Map Reduce Local Work + Map 4 + Map Operator Tree: + TableScan + alias: src3 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: UDFToDouble(_col0) (type: double) + sort order: + + Map-reduce partition columns: UDFToDouble(_col0) (type: double) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double) + 1 UDFToDouble(_col0) (type: double) + outputColumnNames: _col0, _col3 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col3 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest + + Stage: Stage-2 + Stats-Aggr Operator + +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest +POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest +POSTHOOK: Lineage: dest.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest.value SIMPLE [(src)src3.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest +PREHOOK: type: QUERY +PREHOOK: Input: default@dest +#### A masked pattern was here #### +POSTHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest +#### A masked pattern was here #### +33815990627