diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9918676..676d4d0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1934,6 +1934,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Whether to send the query plan via local resource or RPC"), HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true, "Whether to generate the splits locally or in the AM (tez only)"), + HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consisten split" + + "locations when generating splits in the AM"), HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez (Hadoop 2 only)"), diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 895f389..b944101 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -484,7 +484,6 @@ private String selectHost(String[] requestedHosts) { String host = null; if (requestedHosts != null && requestedHosts.length > 0) { // Pick the first host always. Weak attempt at cache affinity. - Arrays.sort(requestedHosts); host = requestedHosts[0]; if (activeHosts.get(host) != null) { LOG.info("Selected host: " + host + " from requested hosts: " + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 446916c..5f7b20b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -474,8 +474,12 @@ private void processAllEvents(String inputName, LOG.info("Setting vertex parallelism since we have seen all inputs."); + boolean generateConsistentSplits = HiveConf.getBoolVar( + conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS); + LOG.info("GenerateConsistenSplitsInHive=" + generateConsistentSplits); context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper - .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap, + .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]), + generateConsistentSplits)), emMap, rootInputSpecUpdate); finalSplits.clear(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index ccaecdc..d5a18cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import com.google.common.base.Preconditions; @@ -26,10 +28,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -105,6 +109,8 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE pruner.prune(); InputSplitInfoMem inputSplitInfo = null; + boolean generateConsistentSplits = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS); + LOG.info("GenerateConsistentSplitsInHive=" + generateConsistentSplits); String realInputFormatName = conf.get("mapred.input.format.class"); boolean groupingEnabled = userPayloadProto.getGroupingEnabled(); if (groupingEnabled) { @@ -123,6 +129,8 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + // Sort the splits, so that subsequent grouping is consistent. + Arrays.sort(splits, new InputSplitComparator()); LOG.info("Number of input splits: " + splits.length + ". " + availableSlots + " available slots, " + waves + " waves. Input format is: " + realInputFormatName); @@ -132,7 +140,8 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); LOG.info("Number of grouped splits: " + flatSplits.length); - List locationHints = splitGrouper.createTaskLocationHints(flatSplits); + List locationHints = + splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits); Utilities.clearWork(jobConf); @@ -195,4 +204,47 @@ public void handleInputInitializerEvent(List events) thro pruner.addEvent(e); } } + + // Descending sort based on split size| Followed by file name. Followed by startPosition. + private static class InputSplitComparator implements Comparator { + @Override + public int compare(InputSplit o1, InputSplit o2) { + try { + long len1 = o1.getLength(); + long len2 = o2.getLength(); + if (len1 < len2) { + return 1; + } else if (len1 == len2) { + // If the same size. Sort on file name followed by startPosition. + if (o1 instanceof FileSplit && o2 instanceof FileSplit) { + FileSplit fs1 = (FileSplit) o1; + FileSplit fs2 = (FileSplit) o2; + if (fs1.getPath() != null && fs2.getPath() != null) { + int pathComp = (fs1.getPath().compareTo(fs2.getPath())); + if (pathComp == 0) { + // Compare start Position + long startPos1 = fs1.getStart(); + long startPos2 = fs2.getStart(); + if (startPos1 > startPos1) { + return 1; + } else if (startPos1 < startPos2) { + return -1; + } else { + return 0; + } + } else { + return pathComp; + } + } + } + // No further checks if not a file split. Return equality. + return 0; + } else { + return -1; + } + } catch (IOException e) { + throw new RuntimeException("Problem getting input split size", e); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index c169677..c12da37 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -24,8 +24,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -105,19 +107,39 @@ /** * Create task location hints from a set of input splits * @param splits the actual splits + * @param consistentLocations whether to re-order locations for each split, if it's a file split * @return taskLocationHints - 1 per input split specified * @throws IOException */ - public List createTaskLocationHints(InputSplit[] splits) throws IOException { + public List createTaskLocationHints(InputSplit[] splits, boolean consistentLocations) throws IOException { List locationHints = Lists.newArrayListWithCapacity(splits.length); for (InputSplit split : splits) { String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null; if (rack == null) { - if (split.getLocations() != null) { - locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList(split - .getLocations())), null)); + String [] locations = split.getLocations(); + if (locations != null && locations.length > 0) { + // Worthwhile only if more than 1 split, consistentGroupingEnabled and is a FileSplit + if (consistentLocations && locations.length > 1 && split instanceof FileSplit) { + Arrays.sort(locations); + FileSplit fileSplit = (FileSplit) split; + Path path = fileSplit.getPath(); + long startLocation = fileSplit.getStart(); + int hashCode = Objects.hash(path, startLocation); + int startIndex = hashCode % locations.length; + LinkedHashSet locationSet = new LinkedHashSet<>(locations.length); + // Set up the locations starting from startIndex, and wrapping around the sorted array. + for (int i = 0 ; i < locations.length ; i++) { + int index = (startIndex + i) % locations.length; + locationSet.add(locations[index]); + } + locationHints.add(TaskLocationHint.createTaskLocationHint(locationSet, null)); + } else { + locationHints.add(TaskLocationHint + .createTaskLocationHint(new LinkedHashSet(Arrays.asList(split + .getLocations())), null)); + } } else { locationHints.add(TaskLocationHint.createTaskLocationHint(null, null)); }