Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1408864) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -120,6 +120,8 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; +import org.apache.hadoop.hive.ql.plan.api.Adjacency; +import org.apache.hadoop.hive.ql.plan.api.Graph; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -226,6 +228,25 @@ } } + public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { + try { + Graph stageGraph = plan.getQueryPlan().getStageGraph(); + if (stageGraph == null) + return; + List adjList = stageGraph.getAdjacencyList(); + if (adjList == null) + return; + for (Adjacency adj : adjList) { + List children = adj.getChildren(); + if (children == null || children.isEmpty()) + return; + conf.setStrings("mapreduce.workflow.adjacency."+adj.getNode(), + children.toArray(new String[children.size()])); + } + } catch (IOException e) { + } + } + public static List getFieldSchemaString(List fl) { if (fl == null) { return null; Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1408864) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -1060,6 +1060,10 @@ conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr); + + conf.set("mapreduce.workflow.id", "hive_"+queryId); + conf.set("mapreduce.workflow.name", queryStr); + maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); try { @@ -1335,6 +1339,8 @@ if (noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")"); } + conf.set("mapreduce.workflow.node.name", tsk.getId()); + Utilities.setWorkflowAdjacencies(conf, plan); cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); }