diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 9c808d4..4b316d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -947,6 +947,8 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc * A FileSinkOperator is added after parent to output the data. * Before child, we add a TableScanOperator to load data stored in the temporary * file back. + * + * TODO: for hive on spark, we changed it to public, but ideally we should change it back to protected. * @param parent * @param child * @param taskTmpDir @@ -954,7 +956,7 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc * @param parseCtx * @return The TableScanOperator inserted before child. */ - protected static TableScanOperator createTemporaryFile( + public static TableScanOperator createTemporaryFile( Operator parent, Operator child, Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 5ddc16d..c652657 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.parse.spark; import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -32,10 +34,10 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -43,16 +45,16 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; /** - * GenSparkProcContext maintains information about the tasks and operators + * GenSparkProcContext maintains information about the tasks and operators * as we walk the operator tree to break them into SparkTasks. - * + * * Cloned from GenTezProcContext. * */ @@ -81,10 +83,23 @@ // Spark task we're currently processing public SparkTask currentTask; + // the default task (other than tasks generated for multi-table insertion) + public SparkTask defaultTask; + // last work we've processed (in order to hook it up to the current // one. public BaseWork preceedingWork; + // All SELs that we should unlink with their parents, for multi-table insertion + public Set> selectOpsToUnlink; + + // A mapping from operators to their corresponding tasks. + // The key for this map could only be either + // 1. generated temporary TableScanOperators (so we can use + // different task other than the default one) + // 2. FileSinkOperators (need this info in GenSparkUtils::processFileSinks) + public final Map, SparkTask> opToTaskTable; + // map that keeps track of the last operator of a task to the work // that follows it. This is used for connecting them later. public final Map, BaseWork> leafOperatorToFollowingWork; @@ -142,8 +157,10 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.rootTasks = rootTasks; this.inputs = inputs; this.outputs = outputs; - this.currentTask = (SparkTask) TaskFactory.get( - new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + this.currentTask = null; + this.defaultTask = (SparkTask) TaskFactory.get( + new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + rootTasks.add(defaultTask); this.leafOperatorToFollowingWork = new LinkedHashMap, BaseWork>(); this.linkOpWithWorkMap = new LinkedHashMap, Map>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap>(); @@ -162,8 +179,7 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.linkedFileSinks = new LinkedHashMap>(); this.fileSinkSet = new LinkedHashSet(); this.connectedReduceSinks = new LinkedHashSet(); - - rootTasks.add(currentTask); + this.selectOpsToUnlink = new HashSet>(); + this.opToTaskTable = new HashMap, SparkTask>(); } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 379a39c..1f0dcde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -34,14 +34,31 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.*; import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.UnionWork; /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork @@ -157,6 +174,14 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, return mapWork; } + public MapWork createMapWork(GenSparkProcContext context, TableScanOperator root, + SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException { + MapWork mapWork = new MapWork("Map " + (++sequenceNumber)); + GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc); + sparkWork.add(mapWork); + return mapWork; + } + // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenSparkProcContext context, PrunedPartitionList partitions, Operator root, @@ -216,7 +241,9 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context FileSinkOperator fileSink = (FileSinkOperator)current; // remember it for additional processing later - context.fileSinkSet.add(fileSink); + // This line generates duplicated fileSinks, which causes problem as stated in HIVE-7870. + // Please correct me if this is NOT a bug. + // context.fileSinkSet.add(fileSink); FileSinkDesc desc = fileSink.getConf(); Path path = desc.getDirName(); @@ -269,15 +296,19 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi throws SemanticException { ParseContext parseContext = context.parseContext; + Preconditions.checkArgument(context.opToTaskTable.containsKey(fileSink), + "AssertionError: the fileSink " + fileSink.getName() + " should be in the context"); + + SparkTask currentTask = context.opToTaskTable.get(fileSink); boolean isInsertTable = // is INSERT OVERWRITE TABLE GenMapRedUtils.isInsertInto(parseContext, fileSink); HiveConf hconf = parseContext.getConf(); boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, - hconf, fileSink, context.currentTask, isInsertTable); + hconf, fileSink, currentTask, isInsertTable); - Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, + Path finalName = GenMapRedUtils.createMoveTask(currentTask, chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); if (chDir) { @@ -286,13 +317,13 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi logger.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, currentTask); } FetchTask fetchTask = parseContext.getFetchTask(); - if (fetchTask != null && context.currentTask.getNumChild() == 0) { + if (fetchTask != null && currentTask.getNumChild() == 0) { if (fetchTask.isFetchFrom(fileSink.getConf())) { - context.currentTask.setFetchSource(true); + currentTask.setFetchSource(true); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 864965e..0109d7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -50,7 +50,7 @@ * and break the operators into work and tasks along the way. * * Cloned from GenTezWork. - * + * * TODO: need to go thru this to make it fit completely to Spark. */ public class GenSparkWork implements NodeProcessor { @@ -95,6 +95,10 @@ public Object process(Node nd, Stack stack, return null; } + if (operator instanceof FileSinkOperator) { + context.opToTaskTable.put(operator, context.currentTask); + } + SparkWork sparkWork = context.currentTask.getWork(); // Right now the work graph is pretty simple. If there is no diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 76fc290..11be355 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -72,9 +73,9 @@ /** * SparkCompiler translates the operator plan into SparkTasks. - * + * * Pretty much cloned from TezCompiler. - * + * * TODO: need to complete and make it fit to Spark. */ public class SparkCompiler extends TaskCompiler { @@ -86,7 +87,7 @@ public SparkCompiler() { @Override public void init(HiveConf conf, LogHelper console, Hive db) { super.init(conf, console, db); - + // TODO: Need to check if we require the use of recursive input dirs for union processing // conf.setBoolean("mapred.input.dir.recursive", true); // HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true); @@ -139,9 +140,25 @@ protected void generateTaskTree(List> rootTasks, Pa GenSparkProcContext procCtx = new GenSparkProcContext( conf, tempParseContext, mvTask, rootTasks, inputs, outputs); + // -------------------- First Pass --------------------- + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"), + new SparkTableScanProcessor()); + opRules.put(new RuleRegExp("SEL", SelectOperator.getOperatorName() + "%"), + new SparkMultiInsertionProcessor()); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + List topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); + ogw.startWalking(topNodes, null); + + // ------------------- Second Pass ---------------------- + // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); + opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); @@ -154,28 +171,48 @@ protected void generateTaskTree(List> rootTasks, Pa opRules.put(new RuleRegExp("Handle Analyze Command", TableScanOperator.getOperatorName() + "%"), - new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); - - opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), - new NodeProcessor() { - @Override - public Object process(Node n, Stack s, - NodeProcessorCtx procCtx, Object... os) throws SemanticException { - GenSparkProcContext context = (GenSparkProcContext) procCtx; - UnionOperator union = (UnionOperator) n; - - // simply need to remember that we've seen a union. - context.currentUnionOperators.add(union); - return null; + new CompositeProcessor( + new NodeProcessor() { + @Override + public Object process(Node nd, Stack s, + NodeProcessorCtx procCtx, Object... no) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + if (context.opToTaskTable.containsKey(nd)) { + context.currentTask = context.opToTaskTable.get(nd); + } else { + context.currentTask = context.defaultTask; + } + return null; + } + }, + new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()))); + + opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), + new NodeProcessor() { + @Override + public Object process(Node n, Stack s, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + UnionOperator union = (UnionOperator) n; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; + } } - }); + ); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); - List topNodes = new ArrayList(); + disp = new DefaultRuleDispatcher(null, opRules, procCtx); + topNodes = new ArrayList(); topNodes.addAll(pCtx.getTopOps().values()); - GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); + for (Operator op : procCtx.opToTaskTable.keySet()) { + if (op instanceof TableScanOperator) { + topNodes.add(op); + } + } + ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); // we need to clone some operator plans and remove union operators still diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java new file mode 100644 index 0000000..b2dd249 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; + +import java.util.Stack; + + +public class SparkMultiInsertionProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SelectOperator selOp = (SelectOperator) nd; + GenSparkProcContext context = (GenSparkProcContext) procCtx; + + if (context.selectOpsToUnlink.contains(selOp)) { + splitPlan(selOp, context); + context.selectOpsToUnlink.remove(selOp); + } + + return null; + } + + /** + * + * @param selOp the select operator encountered + * @param context processing context + */ + private void splitPlan(SelectOperator selOp, GenSparkProcContext context) + throws SemanticException { + // Generate a new task + // It's not possible to have nested multi-insertions + SparkTask parentTask = context.defaultTask; + SparkTask childTask = (SparkTask) TaskFactory.get( + new SparkWork(context.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), context.conf); + splitTasks(selOp, parentTask, childTask, context); + } + + /** + * Split two tasks by creating a temporary file between them. + * + * @param selOp select operator being processed + * @param parentTask the parent task + * @param childTask the child task + * @param context context + **/ + @SuppressWarnings("nls") + private void splitTasks(SelectOperator selOp, + SparkTask parentTask, SparkTask childTask, + GenSparkProcContext context) throws SemanticException { + Preconditions.checkArgument(selOp.getNumParent() == 1, + "AssertionError: expecting operator " + selOp + " to have only one parent," + + " but found multiple parents : " + selOp.getParentOperators()); + + GenSparkUtils utils = GenSparkUtils.getUtils(); + ParseContext parseCtx = context.parseContext; + parentTask.addDependentTask(childTask); + + // Generate the temporary file name + Context baseCtx = parseCtx.getContext(); + Operator parent = selOp.getParentOperators().get(0); + Path taskTmpDir = baseCtx.getMRTmpPath(); + TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); + + TableScanOperator tableScan = GenMapRedUtils.createTemporaryFile( + parent, selOp, taskTmpDir, tt_desc, parseCtx); + + String streamDesc = taskTmpDir.toUri().toString(); + context.opToTaskTable.put(tableScan, childTask); + MapWork mapWork = utils.createMapWork(context, tableScan, + childTask.getWork(), streamDesc, tt_desc); + context.rootToWorkMap.put(tableScan, mapWork); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java new file mode 100644 index 0000000..b126dd6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.List; +import java.util.Stack; + +public class SparkTableScanProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + TableScanOperator tblScan = (TableScanOperator) nd; + + // For multi-table insertion, starting from this TS, we do a DFS to find the first operator that: + // 1. has multiple children + // 2. all children are SelectorOperator + // + // That is, in scenarios like the following: + // + // OP (TS, UNION, etc) + // / \ + // SEL .. SEL + // + // If we find such an operator, we record all of its children (SELs) to context, and unlink + // them with this operator later, in SparkMultiInsertionProcessor, and it will be become: + // + // OP (TS, UNION, etc) + // / \ + // FS FS + // + // TS TS + // | | + // SEL SEL + // + // where the two branches starting with TS are in different Spark tasks. + Stack> path = new Stack>(); + path.push(tblScan); + while (!path.isEmpty()) { + Operator op = path.pop(); + List> childOps = op.getChildOperators(); + if (childOps.size() > 1) { + boolean allSelects = true; + for (Operator childOp : childOps) { + if (!(childOp instanceof SelectOperator)) { + allSelects = false; + break; + } + } + if (allSelects) { + context.selectOpsToUnlink.addAll(childOps); + break; + } + } + + for (Operator childOp : childOps) { + path.push(childOp); + } + } + + return null; + } +}