diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dfbadf7..79c276b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1638,7 +1639,7 @@ public int execute() throws CommandNeedRetryException { // remove incomplete outputs. // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. // remove them - HashSet remOutputs = new HashSet(); + HashSet remOutputs = new LinkedHashSet(); for (WriteEntity output : plan.getOutputs()) { if (!output.isComplete()) { remOutputs.add(output); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 786e17f..5ac523a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -69,6 +69,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -426,7 +427,7 @@ public int execute(DriverContext driverContext) { // For DP, WriteEntity creation is deferred at this stage so we need to update // queryPlan here. if (queryPlan.getOutputs() == null) { - queryPlan.setOutputs(new HashSet()); + queryPlan.setOutputs(new LinkedHashSet()); } queryPlan.getOutputs().add(enty); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java index 617723e..06e7547 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.index; import java.io.Serializable; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -34,7 +34,7 @@ */ public class HiveIndexQueryContext { - private HashSet additionalSemanticInputs; // additional inputs to add to the parse context when + private Set additionalSemanticInputs; // additional inputs to add to the parse context when // merging the index query tasks private String indexInputFormat; // input format to set on the TableScanOperator to activate indexing private String indexIntermediateFile; // name of intermediate file written by the index query for the @@ -52,12 +52,12 @@ public HiveIndexQueryContext() { this.queryTasks = null; } - public HashSet getAdditionalSemanticInputs() { + public Set getAdditionalSemanticInputs() { return additionalSemanticInputs; } - public void addAdditionalSemanticInputs(HashSet additionalParseInputs) { + public void addAdditionalSemanticInputs(Set additionalParseInputs) { if (this.additionalSemanticInputs == null) { - this.additionalSemanticInputs = new HashSet(); + this.additionalSemanticInputs = new LinkedHashSet(); } this.additionalSemanticInputs.addAll(additionalParseInputs); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index de5cb3a..05dfc4b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -576,13 +576,9 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set parts, Collection inputs, + ReadEntity parentViewInfo, boolean isDirectRead) { + // Store the inputs in a HashMap since we can't get a ReadEntity from inputs since it is + // implemented as a set.ReadEntity is used as the key so that the HashMap has the same behavior + // of equals and hashCode + Map readEntityMap = + new LinkedHashMap(inputs.size()); + for (ReadEntity input : inputs) { + readEntityMap.put(input, input); + } + + for (Partition part : parts) { + ReadEntity newInput = null; + if (part.getTable().isPartitioned()) { + newInput = new ReadEntity(part, parentViewInfo, isDirectRead); + } else { + newInput = new ReadEntity(part.getTable(), parentViewInfo, isDirectRead); + } + + if (readEntityMap.containsKey(newInput)) { + ReadEntity input = readEntityMap.get(newInput); + if ((newInput.getParents() != null) && (!newInput.getParents().isEmpty())) { + input.getParents().addAll(newInput.getParents()); + input.setDirect(input.isDirect() || newInput.isDirect()); + } + } else { + readEntityMap.put(newInput, newInput); + } + } + + // Add the new ReadEntity that were added to readEntityMap in PlanUtils.addInput + if (inputs.size() != readEntityMap.size()) { + inputs.addAll(readEntityMap.keySet()); + } + } public static void addInputsForView(ParseContext parseCtx) throws HiveException { Set inputs = parseCtx.getSemanticInputs();