diff --git build.properties build.properties index dd8b70e..6873648 100644 --- build.properties +++ build.properties @@ -29,7 +29,7 @@ javac.args.warnings= hadoop-0.20.version=0.20.2 hadoop-0.20S.version=1.1.2 -hadoop-0.23.version=2.0.5-alpha +hadoop-0.23.version=2.1.0-beta hadoop.version=${hadoop-0.20.version} hadoop.security.version=${hadoop-0.20S.version} # Used to determine which set of Hadoop artifacts we depend on. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 5559b2a..faa99f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -238,9 +238,9 @@ private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceW /* * Helper function to create Vertex for given ReduceWork. */ - private static Vertex creatVertex(JobConf conf, ReduceWork reduceWork, int seqNo, + private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, int seqNo, LocalResource appJarLr, List additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx) throws IOException { + Path mrScratchDir, Context ctx) throws Exception { // write out the operator plan Path planPath = Utilities.setReduceWork(conf, reduceWork, @@ -404,16 +404,16 @@ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { */ public static Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, int seqNo, LocalResource appJarLr, List additionalLr, - FileSystem fileSystem, Context ctx) { + FileSystem fileSystem, Context ctx) throws Exception { // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - return createVertex(conf, (MapWork) work, scratchDir, seqNo, appJarLr, - additionalLr, fileSystem, ctx); + return createVertex(conf, (MapWork) work, seqNo, appJarLr, + additionalLr, fileSystem, scratchDir, ctx); } else if (work instanceof ReduceWork) { - return createVertex(conf, (ReduceWork) work, scratchDir, seqNo, appJarLr, - additionalLr, fileSystem, ctx); + return createVertex(conf, (ReduceWork) work, seqNo, appJarLr, + additionalLr, fileSystem, scratchDir, ctx); } else { assert false; return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f22e614..ac536e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -102,7 +102,7 @@ public int execute(DriverContext driverContext) { private DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, Context ctx) - throws IOException { + throws Exception { Map workToVertex = new HashMap(); Map workToConf = new HashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 0408993..46c8b6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -85,7 +85,7 @@ LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils"); } - private static boolean needsTagging(ReduceWork rWork) { + public static boolean needsTagging(ReduceWork rWork) { return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class || rWork.getReducer().getClass() == DemuxOperator.class); } @@ -439,18 +439,38 @@ private static ReadEntity getParentViewInfo(String alias_id, public static void setTaskPlan(String alias_id, Operator topOp, Task task, boolean local, GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException { - MapWork plan = ((MapredWork) task.getWork()).getMapWork(); - ParseContext parseCtx = opProcCtx.getParseCtx(); - Set inputs = opProcCtx.getInputs(); + setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(), + opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local); + opProcCtx.addSeenOp(task, topOp); + } + /** + * initialize MapWork + * + * @param alias_id + * current alias + * @param topOp + * the top operator of the stack + * @param plan + * map work to initialize + * @param local + * whether you need to add to map-reduce or local work + * @param pList + * pruned partition list. If it is null it will be computed on-the-fly. + * @param inputs + * read entities for the map work + * @param conf + * current instance of hive conf + */ + public static void setMapWork(MapWork plan, ParseContext parseCtx, Set inputs, + PrunedPartitionList partsList, Operator topOp, String alias_id, + HiveConf conf, boolean local) throws SemanticException { ArrayList partDir = new ArrayList(); ArrayList partDesc = new ArrayList(); Path tblDir = null; TableDesc tblDesc = null; - PrunedPartitionList partsList = pList; - plan.setNameToSplitSample(parseCtx.getNameToSplitSample()); if (partsList == null) { @@ -458,7 +478,7 @@ public static void setTaskPlan(String alias_id, partsList = parseCtx.getOpToPartList().get((TableScanOperator) topOp); if (partsList == null) { partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp), - parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(), + parseCtx.getOpToPartPruner().get(topOp), conf, alias_id, parseCtx.getPrunedPartitions()); parseCtx.getOpToPartList().put((TableScanOperator) topOp, partsList); } @@ -704,7 +724,6 @@ public static void setTaskPlan(String alias_id, } plan.setMapLocalWork(localPlan); } - opProcCtx.addSeenOp(task, topOp); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index ecca44c..7ef4850 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -24,9 +24,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -82,7 +85,13 @@ public Object process(Node nd, Stack stack, assert root.getParentOperators().isEmpty(); LOG.debug("Adding map work for " + root); MapWork mapWork = new MapWork(); - mapWork.getAliasToWork().put("", root); + + // map work starts with table scan operators + assert root instanceof TableScanOperator; + String alias = ((TableScanOperator)root).getConf().getAlias(); + + GenMapRedUtils.setMapWork(mapWork, context.parseContext, + context.inputs, null, root, alias, context.conf, false); tezWork.add(mapWork); work = mapWork; } else { @@ -90,6 +99,16 @@ public Object process(Node nd, Stack stack, LOG.debug("Adding reduce work for " + root); ReduceWork reduceWork = new ReduceWork(); reduceWork.setReducer(root); + reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); + + // All parents should be reduce sinks. We pick the first to choose the + // number of reducers. In the join/union case they will all be -1. In + // sort/order case where it matters there will be only one parent. + assert root.getParentOperators().get(0) instanceof ReduceSinkOperator; + ReduceSinkOperator reduceSink + = (ReduceSinkOperator)root.getParentOperators().get(0); + reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, diff --git ql/src/test/queries/clientpositive/mrr.q ql/src/test/queries/clientpositive/mrr.q new file mode 100644 index 0000000..1bee97c --- /dev/null +++ ql/src/test/queries/clientpositive/mrr.q @@ -0,0 +1,6 @@ +set hive.auto.convert.join=true; +set hive.optimize.tez=true; +EXPLAIN EXTENDED SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt; +EXPLAIN EXTENDED SELECT s2.key, count(distinct s2.value) as cnt FROM src s1 join src s2 on (s1.key = s2.key) GROUP BY s2.key ORDER BY cnt; + +SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt; \ No newline at end of file diff --git ql/src/test/results/clientpositive/mrr.q.out ql/src/test/results/clientpositive/mrr.q.out new file mode 100644 index 0000000..a8151e0 --- /dev/null +++ ql/src/test/results/clientpositive/mrr.q.out @@ -0,0 +1,386 @@ +PREHOOK: query: EXPLAIN EXTENDED SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTION count (TOK_TABLE_OR_COL value)) cnt)) (TOK_GROUPBY (TOK_TABLE_OR_COL key)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cnt))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Alias -> Map Operator Tree: + src + TableScan + alias: src + GatherStats: false + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: key, value + Group By Operator + aggregations: + expr: count(value) + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src + name: default.src + Truncated Path -> Alias: + /src [src] + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col1 + type: bigint + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + Needs Tagging: false + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types string:bigint + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: EXPLAIN EXTENDED SELECT s2.key, count(distinct s2.value) as cnt FROM src s1 join src s2 on (s1.key = s2.key) GROUP BY s2.key ORDER BY cnt +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT s2.key, count(distinct s2.value) as cnt FROM src s1 join src s2 on (s1.key = s2.key) GROUP BY s2.key ORDER BY cnt +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) s1) (TOK_TABREF (TOK_TABNAME src) s2) (= (. (TOK_TABLE_OR_COL s1) key) (. (TOK_TABLE_OR_COL s2) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s2) key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (. (TOK_TABLE_OR_COL s2) value)) cnt)) (TOK_GROUPBY (. (TOK_TABLE_OR_COL s2) key)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cnt))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Alias -> Map Operator Tree: + s2 + TableScan + alias: s2 + GatherStats: false + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + value expressions: + expr: key + type: string + expr: value + type: string + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src + name: default.src + Truncated Path -> Alias: + /src [s2] + Alias -> Map Operator Tree: + s1 + TableScan + alias: s1 + GatherStats: false + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src + name: default.src + Truncated Path -> Alias: + /src [s1] + Needs Tagging: true + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col0} {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col4, _col5 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col4, _col5 + Group By Operator + aggregations: + expr: count(DISTINCT _col5) + bucketGroup: false + keys: + expr: _col4 + type: string + expr: _col5 + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col1:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col1 + type: bigint + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + Needs Tagging: false + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types string:bigint + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + +