diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index ec25713906..c7c77ab016 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1587,6 +1587,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning_4.q,\ spark_dynamic_partition_pruning_5.q,\ spark_dynamic_partition_pruning_6.q,\ + spark_dynamic_partition_pruning_7.q,\ spark_dynamic_partition_pruning_mapjoin_only.q,\ spark_constprog_dpp.q,\ spark_dynamic_partition_pruning_recursive_mapjoin.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java index ed889fad2d..d3b87de045 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java @@ -32,6 +32,7 @@ import com.clearspring.analytics.util.Preconditions; import javolution.testing.AssertionException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -174,7 +175,7 @@ private void prunePartitionSingleSource(SourceInfo info, MapWork work) throws HiveException { Set values = info.values; // strip the column name of the targetId - String columnName = info.columnName.substring(info.columnName.indexOf(':') + 1); + String columnName = SparkPartitionPruningSinkDesc.stripOfTargetId(info.columnName); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 74f03686e0..77ebb4c932 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -18,25 +18,30 @@ package org.apache.hadoop.hive.ql.optimizer.spark; -import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -44,7 +49,6 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -61,14 +65,14 @@ */ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver { protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class); - private List removedMapWorkNames = new ArrayList(); private PhysicalContext pctx; @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { this.pctx = pctx; List topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); - TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher()); + // use a pre-order walker so that DPP sink works are visited (and combined) first + GraphWalker taskWalker = new PreOrderWalker(new EquivalentWorkMatcher()); HashMap nodeOutput = Maps.newHashMap(); taskWalker.startWalking(topNodes, nodeOutput); return pctx; @@ -82,25 +86,19 @@ public int compare(BaseWork o1, BaseWork o2) { } }; - // maps from a work to the DPPs it contains + // maps from a work to the DPPs it contains -- used to combine equivalent DPP sinks private Map> workToDpps = new HashMap<>(); + // maps from unique id to DPP sink -- used to update the DPP sinks when + // target map works are removed + private Map idToDpps = new HashMap<>(); + @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { if (nd instanceof SparkTask) { SparkTask sparkTask = (SparkTask) nd; SparkWork sparkWork = sparkTask.getWork(); - // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2. - // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first - // put children task in the front of task queue. - // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1 - // is traversed(More detailed see HIVE-16948) - if (removedMapWorkNames.size() > 0) { - removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork); - if (sparkWork.getAllWork().size() == 0) { - removeEmptySparkTask(sparkTask); - } - } + collectDPPInfos(sparkWork); Set roots = sparkWork.getRoots(); compareWorksRecursively(roots, sparkWork); @@ -108,6 +106,19 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws return null; } + private void collectDPPInfos(SparkWork sparkWork) { + for (BaseWork work : sparkWork.getAllWork()) { + Set> seen = new HashSet<>(); + for (Operator root : work.getAllRootOperators()) { + List sinks = new ArrayList<>(); + SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, sinks, seen); + for (SparkPartitionPruningSinkOperator sink : sinks) { + idToDpps.put(sink.getUniqueId(), sink); + } + } + } + } + private void compareWorksRecursively(Set works, SparkWork sparkWork) { workToDpps.clear(); // find out all equivalent works in the Set. @@ -173,6 +184,7 @@ private boolean belongToSet(Set set, BaseWork work, SparkWork sparkWor // equivalent works must have dpp lists of same size for (int i = 0; i < dppList1.size(); i++) { combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i)); + idToDpps.remove(dppList2.get(i).getUniqueId()); } } replaceWork(next, first, sparkWork); @@ -203,7 +215,22 @@ private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWor sparkWork.remove(previous); // In order to fix HIVE-16948 if (previous instanceof MapWork) { - removedMapWorkNames.add(previous.getName()); + removeTargetFromDPP((MapWork) previous); + } + } + + // Remove the map work from DPP sinks that have it as a target + private void removeTargetFromDPP(MapWork target) { + Set dppIds = target.getEventSourceColumnNameMap().keySet(); + for (String dppId : dppIds) { + SparkPartitionPruningSinkOperator sink = idToDpps.get(dppId); + Preconditions.checkNotNull(sink, "Unable to find DPP sink whose target work is removed."); + SparkPartitionPruningSinkDesc desc = sink.getConf(); + desc.removeTarget(target.getName()); + // If the target can be removed, it means there's another MapWork that shares the same + // DPP sink, and therefore it cannot be the only target. + Preconditions.checkState(!desc.getTargetInfos().isEmpty(), + "The removed target work is the only target."); } } @@ -279,6 +306,10 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork } private boolean compareMapWork(MapWork first, MapWork second) { + return hasSamePathToPartition(first, second) && targetsOfSameDPPSink(first, second); + } + + private boolean hasSamePathToPartition(MapWork first, MapWork second) { Map pathToPartition1 = first.getPathToPartitionInfo(); Map pathToPartition2 = second.getPathToPartitionInfo(); if (pathToPartition1.size() == pathToPartition2.size()) { @@ -295,6 +326,49 @@ private boolean compareMapWork(MapWork first, MapWork second) { return false; } + // Checks whether two MapWorks will be pruned by the same DPP sink + private boolean targetsOfSameDPPSink(MapWork first, MapWork second) { + Set sources1 = first.getEventSourceColumnNameMap().keySet(); + Set sources2 = second.getEventSourceColumnNameMap().keySet(); + if (!sources1.equals(sources2)) { + return false; + } + // check whether each DPP sink target same columns + for (String source : sources1) { + Set names1 = first.getEventSourceColumnNameMap().get(source).stream().map( + SparkPartitionPruningSinkDesc::stripOfTargetId).collect(Collectors.toSet()); + Set names2 = second.getEventSourceColumnNameMap().get(source).stream().map( + SparkPartitionPruningSinkDesc::stripOfTargetId).collect(Collectors.toSet()); + if (!names1.equals(names2)) { + return false; + } + + Set types1 = new HashSet<>(first.getEventSourceColumnTypeMap().get(source)); + Set types2 = new HashSet<>(second.getEventSourceColumnTypeMap().get(source)); + if (!types1.equals(types2)) { + return false; + } + + Set tableDescs1 = new HashSet<>(first.getEventSourceTableDescMap().get(source)); + Set tableDescs2 = new HashSet<>(second.getEventSourceTableDescMap().get(source)); + if (!tableDescs1.equals(tableDescs2)) { + return false; + } + + List descs1 = first.getEventSourcePartKeyExprMap().get(source); + List descs2 = second.getEventSourcePartKeyExprMap().get(source); + if (descs1.size() != descs2.size()) { + return false; + } + for (ExprNodeDesc desc : descs1) { + if (descs2.stream().noneMatch(d -> ExprNodeDescUtils.isSame(d, desc))) { + return false; + } + } + } + return true; + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first); @@ -358,59 +432,6 @@ private boolean compareOperatorChain(Operator firstOperator, Operator seco private boolean compareCurrentOperator(Operator firstOperator, Operator secondOperator) { return firstOperator.logicalEquals(secondOperator); } - - /** - * traverse the children in sparkWork to find the dpp sink operator which target work is included in - * removedMapWorkList - * If there is branch, remove prune sink operator branch in the BaseWork - * If there is no branch, remove the whole BaseWork - * - * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works. - * @param sparkWork: current spark work - */ - private void removeDynamicPartitionPruningSink(List removedMapWorkList, SparkWork sparkWork) { - List allWorks = sparkWork.getAllWork(); - for (BaseWork baseWork : allWorks) { - Set> rootOperators = baseWork.getAllRootOperators(); - for (Operator root : rootOperators) { - List> pruningList = new ArrayList<>(); - SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); - for (Operator pruneSinkOp : pruningList) { - SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; - for (String removedName : removedMapWorkList) { - sparkPruneSinkOp.getConf().removeTarget(removedName); - } - if (sparkPruneSinkOp.getConf().getTargetInfos().isEmpty()) { - LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + - sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " + - "has been deleted!"); - // If there is branch, remove prune sink operator branch in the baseWork - // If there is no branch, remove the whole baseWork - if (OperatorUtils.isInBranch(sparkPruneSinkOp)) { - OperatorUtils.removeBranch(sparkPruneSinkOp); - } else { - sparkWork.remove(baseWork); - } - } - } - } - } - } - - private void removeEmptySparkTask(SparkTask currTask) { - // If currTask is rootTasks, remove it and add its children to the rootTasks which currTask is its only parent - // task - if (pctx.getRootTasks().contains(currTask)) { - pctx.removeFromRootTask(currTask); - List> newRoots = currTask.getChildTasks(); - for (Task newRoot : newRoots) { - if (newRoot.getParentTasks().size() == 1) { - pctx.addToRootTask(newRoot); - } - } - } - SparkUtilities.removeEmptySparkTask(currTask); - } } // Merge the target works of the second DPP sink into the first DPP sink. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java index 1607a3f695..06b8391fa5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.Explain; @@ -141,4 +142,18 @@ public void removeTarget(String name) { } targetInfos.removeAll(toRemove); } + + // Return a combined column name with corresponding target map work ID. + public static String colNameWithTargetId(MapWork target, String colName) { + return SparkUtilities.getWorkId(target) + ":" + colName; + } + + // Return the column from a combined column name. + public static String stripOfTargetId(String name) { + int idx = name.indexOf(":"); + if (idx != -1) { + return name.substring(idx + 1); + } + return name; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 900a80000d..757cb7af4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -475,7 +475,8 @@ public void processPartitionPruningSink(GenSparkProcContext context, } targetInfo.work = targetWork; - targetInfo.columnName = SparkUtilities.getWorkId(targetWork) + ":" + targetInfo.columnName; + targetInfo.columnName = SparkPartitionPruningSinkDesc.colNameWithTargetId( + targetWork, targetInfo.columnName); pruningSink.addAsSourceEvent(targetWork, targetInfo.partKey, targetInfo.columnName, targetInfo.columnType); diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q new file mode 100644 index 0000000000..8bec9b838d --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q @@ -0,0 +1,12 @@ +set hive.spark.dynamic.partition.pruning=true; + +-- SORT_QUERY_RESULTS + +-- This qfile tests MapWorks won't be combined if they're targets of different DPP sinks + +-- MapWorks for srcpart shouldn't be combined because they're pruned by different DPP sinks +explain +select * from + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a +union all + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value); \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out new file mode 100644 index 0000000000..56498403bb --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out @@ -0,0 +1,185 @@ +PREHOOK: query: explain +select * from + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a +union all + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a +union all + (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target Columns: [Map 1 -> [ds:string (ds)]] + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map 8 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target Columns: [Map 4 -> [ds:string (ds)]] + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4) + Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 4), Map 6 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), ds (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Map 3 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), ds (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Map 6 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: string), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: string), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +