diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 1cbc272207..cc913d757d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -380,11 +380,11 @@ else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { } } - for (PartitionDesc pd: conf.getAliasToPartnInfo().values()) { + conf.getAliasToPartnInfo().values().forEach(partitionDescs -> partitionDescs.forEach(pd -> { if (!tableNameToConf.containsKey(pd.getTableName())) { tableNameToConf.put(pd.getTableName(), hconf); } - } + })); return tableNameToConf; } @@ -405,16 +405,17 @@ public void initEmptyInputChildren(List> children, Configuration hco for (Operator child : children) { TableScanOperator tsOp = (TableScanOperator) child; StructObjectInspector soi = null; - PartitionDesc partDesc = conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias()); - Configuration newConf = tableNameToConf.get(partDesc.getTableDesc().getTableName()); - Deserializer serde = partDesc.getTableDesc().getDeserializer(); - partDesc.setProperties(partDesc.getProperties()); - MapOpCtx opCtx = new MapOpCtx(tsOp.getConf().getAlias(), child, partDesc); - StructObjectInspector tableRowOI = (StructObjectInspector) serde.getObjectInspector(); - initObjectInspector(newConf, opCtx, tableRowOI); - soi = opCtx.rowObjectInspector; - child.getParentOperators().add(this); - childrenOpToOI.put(child, soi); + for (PartitionDesc partDesc : conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias())) { + Configuration newConf = tableNameToConf.get(partDesc.getTableDesc().getTableName()); + Deserializer serde = partDesc.getTableDesc().getDeserializer(); + partDesc.setProperties(partDesc.getProperties()); + MapOpCtx opCtx = new MapOpCtx(tsOp.getConf().getAlias(), child, partDesc); + StructObjectInspector tableRowOI = (StructObjectInspector) serde.getObjectInspector(); + initObjectInspector(newConf, opCtx, tableRowOI); + soi = opCtx.rowObjectInspector; + child.getParentOperators().add(this); + childrenOpToOI.put(child, soi); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2ff9ad3251..8c30b81f78 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3362,7 +3362,7 @@ public static String getVertexCounterName(String counter, String vertexName) { // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) if (isEmptyTable && !skipDummy) { - pathsToAdd.add(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias)); + pathsToAdd.addAll(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias)); } } @@ -3519,36 +3519,33 @@ private static void updatePathForMapWork(Path newPath, MapWork work, Path path) } @SuppressWarnings("rawtypes") - private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, + private static List createDummyFileForEmptyTable(JobConf job, MapWork work, Path hiveScratchDir, String alias) throws Exception { + List paths = new ArrayList<>(); + List partitionDescs = work.getAliasToPartnInfo().get(alias); + for (PartitionDesc partDesc : partitionDescs) { + TableDesc tableDesc = partDesc.getTableDesc(); + if (tableDesc.isNonNative()) { + // if it does not need native storage, we can't create an empty file for it. + return null; + } + Properties props = tableDesc.getProperties(); + HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, tableDesc); + Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, false); - TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); - if (tableDesc.isNonNative()) { - // if it does not need native storage, we can't create an empty file for it. - return null; - } - - Properties props = tableDesc.getProperties(); - HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, tableDesc); - - Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, false); - - LOG.info("Changed input file for alias {} to newPath", alias, newPath); - - // update the work - - LinkedHashMap> pathToAliases = work.getPathToAliases(); - ArrayList newList = new ArrayList(1); - newList.add(alias); - pathToAliases.put(newPath, newList); - - work.setPathToAliases(pathToAliases); - - PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone(); - work.addPathToPartitionInfo(newPath, pDesc); + if (paths.contains(newPath)) + continue; - return newPath; + LOG.info("Changed input file for alias {} to newPath", alias, newPath); + // update the work + LinkedHashMap> pathToAliases = work.getPathToAliases(); + pathToAliases.put(newPath, new ArrayList<>(Arrays.asList(alias))); + work.setPathToAliases(pathToAliases); + work.addPathToPartitionInfo(newPath, partDesc.clone()); + paths.add(newPath); + } + return paths; } private static final Path[] EMPTY_PATH = new Path[0]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 01dd93c527..aa76ec31bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -517,9 +517,9 @@ private void handleSampling(Context context, MapWork mWork, JobConf job) String alias = mWork.getAliases().get(0); Operator topOp = mWork.getAliasToWork().get(alias); - PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias); + PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias).get(0); - ArrayList parts = mWork.getPartitionDescs(); + List parts = mWork.getPartitionDescs(); List inputPaths = mWork.getPaths(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 91868a4667..30028dde99 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -20,9 +20,8 @@ import java.io.IOException; import java.net.URLClassLoader; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -95,8 +94,13 @@ public void configure(JobConf job) { // create map and fetch operators MapWork mrwork = Utilities.getMapWork(job); - for (PartitionDesc part : mrwork.getAliasToPartnInfo().values()) { - TableDesc tableDesc = part.getTableDesc(); + + Set tableDescs = mrwork.getAliasToPartnInfo().values().stream() + .flatMap(Collection::stream) + .map(PartitionDesc::getTableDesc) + .collect(Collectors.toSet()); + + for (TableDesc tableDesc : tableDescs) { Utilities.copyJobSecretToTableProperties(tableDesc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 530131f207..bdf7fcba8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -72,9 +72,11 @@ execContext = new ExecMapperContext(jc); // create map and fetch operators MapWork mrwork = Utilities.getMapWork(job); - for (PartitionDesc part : mrwork.getAliasToPartnInfo().values()) { - TableDesc tableDesc = part.getTableDesc(); - Utilities.copyJobSecretToTableProperties(tableDesc); + for (List partitionDescs : mrwork.getAliasToPartnInfo().values()) { + for (PartitionDesc part : partitionDescs) { + TableDesc tableDesc = part.getTableDesc(); + Utilities.copyJobSecretToTableProperties(tableDesc); + } } CompilationOpContext runtimeCtx = new CompilationOpContext(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 15c14c9be5..aa549d6a8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -234,7 +234,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE String[] hosts = hostsSet.toArray(new String[0]); FileSplit fileSplit = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts); String alias = mapWork.getAliases().get(0); - PartitionDesc partDesc = mapWork.getAliasToPartnInfo().get(alias); + PartitionDesc partDesc = mapWork.getAliasToPartnInfo().get(alias).get(0); String partIF = partDesc.getInputFileFormatClassName(); splits[0] = new HiveInputFormat.HiveInputSplit(fileSplit, partIF); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index ea2e1fdb65..c051c30d1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -140,9 +140,11 @@ public Object call() { // TODO HIVE-14042. Cleanup may be required if exiting early. Utilities.setMapWork(jconf, mapWork); - for (PartitionDesc part : mapWork.getAliasToPartnInfo().values()) { - TableDesc tableDesc = part.getTableDesc(); - Utilities.copyJobSecretToTableProperties(tableDesc); + for (List partitionDescs : mapWork.getAliasToPartnInfo().values()) { + for (PartitionDesc part : partitionDescs) { + TableDesc tableDesc = part.getTableDesc(); + Utilities.copyJobSecretToTableProperties(tableDesc); + } } String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java index 337b5d2e76..68e699a888 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; @@ -85,10 +86,13 @@ public TeradataBinaryRecordReader(JobConf job, FileSplit fileSplit) throws IOExc String rowLength = job.get(TD_ROW_LENGTH); if (rowLength == null) { LOG.debug("No table property in JobConf. Try to recover the table directly"); - Map partitionDescMap = Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo(); - for (String alias : Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) { - LOG.debug(format("the current alias: %s", alias)); - rowLength = partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH); + for (Map.Entry> entry : Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().entrySet()) { + LOG.debug(format("the current alias: %s", entry.getKey())); + rowLength = entry.getValue().stream() + .map(partitionDesc -> partitionDesc.getTableDesc().getProperties().getProperty(TD_ROW_LENGTH)) + .filter(prop -> prop != null) + .findFirst() + .get(); if (rowLength != null) { break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 2131bf131d..f118195f8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -538,7 +538,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set(Arrays.asList(aliasPartnDesc))); long sizeNeeded = Integer.MAX_VALUE; int fileLimit = -1; @@ -1169,12 +1169,12 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, Map> sourcePathToAliases = source.getPathToAliases(); Map sourcePathToPartitionInfo = source.getPathToPartitionInfo(); Map> sourceAliasToWork = source.getAliasToWork(); - Map sourceAliasToPartnInfo = source.getAliasToPartnInfo(); + Map> sourceAliasToPartnInfo = source.getAliasToPartnInfo(); LinkedHashMap> targetPathToAliases = target.getPathToAliases(); LinkedHashMap targetPathToPartitionInfo = target.getPathToPartitionInfo(); Map> targetAliasToWork = target.getAliasToWork(); - Map targetAliasToPartnInfo = target.getAliasToPartnInfo(); + Map> targetAliasToPartnInfo = target.getAliasToPartnInfo(); if (!sourceAliasToWork.containsKey(sourceAlias) || !targetAliasToWork.containsKey(targetAlias)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index ceeeb8f0a3..635c545bb0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -188,7 +188,7 @@ private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJ } // create fetchwork for partitioned table if (fetchWork == null) { - TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc(); + TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).get(0).getTableDesc(); fetchWork = new FetchWork(partDir, partDesc, table); } // set alias to fetch work diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index f7cedfe3be..aea0ab487f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -19,11 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StringInternUtils; @@ -271,7 +267,7 @@ public static void processSkewJoin(JoinOperator joinOp, PartitionDesc part = new PartitionDesc(tableDescList.get(src), null); newPlan.addPathToPartitionInfo(bigKeyDirPath, part); - newPlan.getAliasToPartnInfo().put(alias, part); + newPlan.getAliasToPartnInfo().put(alias, new ArrayList<>(Arrays.asList(part))); Operator reducer = clonePlan.getReduceWork().getReducer(); assert reducer instanceof JoinOperator; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 7f7f49ba8c..9a7b750d73 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -19,10 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -296,7 +293,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task(Arrays.asList(partitionDesc))); mapWork.setName("Map " + GenSparkUtils.getUtils().getNextSeqNumber()); } // connect all small dir map work to the big dir map work diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index d077b1974d..4ee227457e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -75,7 +75,7 @@ public SortMergeJoinTaskDispatcher(PhysicalContext context) { // plan are fixed. The operator tree will still contain the SMBJoinOperator private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { // Remove the paths which are not part of aliasToPartitionInfo - Map aliasToPartitionInfo = currWork.getAliasToPartnInfo(); + Map> aliasToPartitionInfo = currWork.getAliasToPartnInfo(); List removePaths = new ArrayList<>(); for (Map.Entry> entry : currWork.getPathToAliases().entrySet()) { @@ -117,12 +117,13 @@ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { // Add the entry in mapredwork currWork.getAliasToWork().put(alias, op); - PartitionDesc partitionInfo = currWork.getAliasToPartnInfo().get(alias); - if (fetchWork.getTblDir() != null) { - currWork.mergeAliasedInput(alias, fetchWork.getTblDir(), partitionInfo); - } else { - for (Path pathDir : fetchWork.getPartDir()) { - currWork.mergeAliasedInput(alias, pathDir, partitionInfo); + for (PartitionDesc partitionInfo : currWork.getAliasToPartnInfo().get(alias)) { + if (fetchWork.getTblDir() != null) { + currWork.mergeAliasedInput(alias, fetchWork.getTblDir(), partitionInfo); + } else { + for (Path pathDir : fetchWork.getPartDir()) { + currWork.mergeAliasedInput(alias, pathDir, partitionInfo); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index d5a30da419..cd7037ff71 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -100,7 +101,7 @@ private LinkedHashMap> aliasToWork = new LinkedHashMap>(); - private LinkedHashMap aliasToPartnInfo = new LinkedHashMap(); + private LinkedHashMap> aliasToPartnInfo = new LinkedHashMap<>(); private HashMap nameToSplitSample = new LinkedHashMap(); @@ -347,12 +348,11 @@ private static LlapIODescriptor deriveLlapIoDescString(boolean isLlapOn, boolean public void internTable(Interner interner) { if (aliasToPartnInfo != null) { - for (PartitionDesc part : aliasToPartnInfo.values()) { - if (part == null) { - continue; - } - part.intern(interner); - } + aliasToPartnInfo.values().forEach(partitions -> partitions.forEach(part -> { + if (part != null) { + part.intern(interner); + } + })); } if (pathToPartitionInfo != null) { for (PartitionDesc part : pathToPartitionInfo.values()) { @@ -364,7 +364,7 @@ public void internTable(Interner interner) { /** * @return the aliasToPartnInfo */ - public LinkedHashMap getAliasToPartnInfo() { + public LinkedHashMap> getAliasToPartnInfo() { return aliasToPartnInfo; } @@ -373,7 +373,7 @@ public void internTable(Interner interner) { * the aliasToPartnInfo to set */ public void setAliasToPartnInfo( - LinkedHashMap aliasToPartnInfo) { + LinkedHashMap> aliasToPartnInfo) { this.aliasToPartnInfo = aliasToPartnInfo; } @@ -604,8 +604,8 @@ public boolean isMapperCannotSpanPartns() { return new ArrayList(pathToAliases.keySet()); } - public ArrayList getPartitionDescs() { - return new ArrayList(aliasToPartnInfo.values()); + public List getPartitionDescs() { + return aliasToPartnInfo.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); } public Path getTmpHDFSPath() { @@ -655,9 +655,8 @@ public String getSamplingTypeString() { @Override public void configureJobConf(JobConf job) { - for (PartitionDesc partition : aliasToPartnInfo.values()) { - PlanUtils.configureJobConf(partition.getTableDesc(), job); - } + aliasToPartnInfo.values().forEach(partitions -> partitions.forEach(p -> PlanUtils.configureJobConf(p.getTableDesc(), job))); + Collection> mappers = aliasToWork.values(); for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, FileSinkOperator.class)) { PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);