commit 1a9e6c5e520476aa67f556d80d0a867a7f9d2a4e Author: Janaki Lahorani Date: Tue Aug 28 11:44:52 2018 -0700 HIVE-20489: Fix graph traversal of task graph to be iterative diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index d887124c91c3d20bc2d3d0283d56d20b4ff8a75e..e3ae0bf0f3d1fae7b361c642f89c3fd46a644965 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; @@ -894,64 +895,66 @@ public static void setKeyAndValueDescForTaskTree(Task ta } /** - * Called at the end of TaskCompiler::compile to derive final - * explain attributes based on previous compilation. + * Called at the end of TaskCompiler::compile + * This currently does the following for each map work + * 1. Intern the table descriptors of the partitions + * 2. derive final explain attributes based on previous compilation. + * + * The original implementation had 2 functions internTableDesc and deriveFinalExplainAttributes, + * respectively implementing 1 and 2 mentioned above. This was done using recursion over the + * task graph. The recursion was inefficient in a couple of ways. + * - For large graphs the recursion was filling up the stack + * - Instead of finding the mapworks, it was walking all possible paths from root + * causing a huge performance problem. + * + * This implementation combines internTableDesc and deriveFinalExplainAttributes into 1 call. + * This can be done because each refers to information within Map Work and performs a specific + * action. + * + * The revised implementation generates all the map works from all MapReduce tasks (getMRTasks), + * Spark Tasks (getSparkTasks) and Tez tasks (getTezTasks). Then for each of those map works + * invokes the respective call. getMRTasks, getSparkTasks and getTezTasks iteratively walks + * the task graph to find the respective map works. + * + * The iterative implementation of these functions was done as part of HIVE-17195. Before + * HIVE-17195, these functions were recursive and had the same issue. So, picking this patch + * for an older release will also require picking HIVE-17195 at the least. */ - public static void deriveFinalExplainAttributes( - Task task, Configuration conf) { - // TODO: deriveExplainAttributes should be called here, code is too fragile to move it around. - if (task instanceof ConditionalTask) { - for (Task tsk : ((ConditionalTask) task).getListTasks()) { - deriveFinalExplainAttributes(tsk, conf); - } - } else if (task instanceof ExecDriver) { - MapredWork work = (MapredWork) task.getWork(); - work.getMapWork().deriveLlap(conf, true); - } else if (task != null && (task.getWork() instanceof TezWork)) { - TezWork work = (TezWork)task.getWork(); - for (BaseWork w : work.getAllWorkUnsorted()) { - if (w instanceof MapWork) { - ((MapWork)w).deriveLlap(conf, false); - } - } - } else if (task instanceof SparkTask) { - SparkWork work = (SparkWork) task.getWork(); - for (BaseWork w : work.getAllWorkUnsorted()) { - if (w instanceof MapWork) { - ((MapWork) w).deriveLlap(conf, false); - } + public static void finalMapWorkChores( + List> tasks, Configuration conf, + Interner interner) { + List mrTasks = Utilities.getMRTasks(tasks); + if (!mrTasks.isEmpty()) { + for (ExecDriver execDriver : mrTasks) { + execDriver.getWork().getMapWork().internTable(interner); + execDriver.getWork().getMapWork().deriveLlap(conf, true); } } - if (task.getChildTasks() == null) { - return; - } - - for (Task childTask : task.getChildTasks()) { - deriveFinalExplainAttributes(childTask, conf); - } - } - - public static void internTableDesc(Task task, Interner interner) { - - if (task instanceof ConditionalTask) { - for (Task tsk : ((ConditionalTask) task).getListTasks()) { - internTableDesc(tsk, interner); - } - } else if (task instanceof ExecDriver) { - MapredWork work = (MapredWork) task.getWork(); - work.getMapWork().internTable(interner); - } else if (task != null && (task.getWork() instanceof TezWork)) { - TezWork work = (TezWork)task.getWork(); - for (BaseWork w : work.getAllWorkUnsorted()) { - if (w instanceof MapWork) { - ((MapWork)w).internTable(interner); + List tezTasks = Utilities.getTezTasks(tasks); + if (!tezTasks.isEmpty()) { + for (TezTask tezTask : tezTasks) { + if (tezTask.getWork() instanceof TezWork) { + TezWork work = tezTask.getWork(); + for (BaseWork w : work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + ((MapWork)w).internTable(interner); + ((MapWork)w).deriveLlap(conf, false); + } + } } } } - if (task.getNumChild() > 0) { - for (Task childTask : task.getChildTasks()) { - internTableDesc(childTask, interner); + + List sparkTasks = Utilities.getSparkTasks(tasks); + if (!sparkTasks.isEmpty()) { + for (SparkTask sparkTask : sparkTasks) { + SparkWork work = sparkTask.getWork(); + for (BaseWork w : work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + ((MapWork) w).deriveLlap(conf, false); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 005e7b6bb0b839f4cbbc148701843c7cbc240860..baefe1b39eb1936f8a5dd1e2be5bbf3bfb73d63b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -359,10 +359,11 @@ public void compile(final ParseContext pCtx, } Interner interner = Interners.newStrongInterner(); - for (Task rootTask : rootTasks) { - GenMapRedUtils.internTableDesc(rootTask, interner); - GenMapRedUtils.deriveFinalExplainAttributes(rootTask, pCtx.getConf()); - } + + // Perform Final chores on generated Map works + // 1. Intern the table descriptors + // 2. Derive final explain attributes based on previous compilation. + GenMapRedUtils.finalMapWorkChores(rootTasks, pCtx.getConf(), interner); } private String extractTableFullName(StatsTask tsk) throws SemanticException {