diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index f3aa151..57f6c66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -97,7 +97,8 @@ public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException { // Assuming grouping enabled always. userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); - this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); + this.splitLocationProvider = + Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG); LOG.info("SplitLocationProvider: " + splitLocationProvider); // Read all credentials into the credentials instance stored in JobConf. @@ -123,14 +124,15 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE this.jobConf = new JobConf(conf); - this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); - LOG.info("SplitLocationProvider: " + splitLocationProvider); - // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); this.work = Utilities.getMapWork(jobConf); + this.splitLocationProvider = + Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG); + LOG.info("SplitLocationProvider: " + splitLocationProvider); + // Events can start coming in the moment the InputInitializer is created. The pruner // must be setup and initialized here so that it sets up it's structures to start accepting events. // Setting it up in initialize leads to a window where events may come in before the pruner is diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index b33f027..bc438bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -32,11 +32,19 @@ import org.slf4j.Logger; public class Utils { - public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws + + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) + throws IOException { + // fall back to checking confs + return getSplitLocationProvider(conf, true, LOG); + } + + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, boolean useCacheAffinity, Logger LOG) throws IOException { boolean useCustomLocations = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap") - && HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); + && HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS) + && useCacheAffinity; SplitLocationProvider splitLocationProvider; LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index fa7a8a3..9298630 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -73,6 +73,25 @@ @SuppressWarnings({"serial"}) public class MapWork extends BaseWork { + public enum LlapIODescriptor { + DISABLED(null, false), + NO_INPUTS("no inputs", false), + UNKNOWN("unknown", false), + SOME_INPUTS("some inputs", false), + ACID("may be used (ACID table)", true), + ALL_INPUTS("all inputs", true), + CACHE_ONLY("all inputs (cache only)", true); + + final String desc; + final boolean cached; + + LlapIODescriptor(String desc, boolean cached) { + this.desc = desc; + this.cached = cached; + } + + } + // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing private LinkedHashMap> pathToAliases = new LinkedHashMap<>(); @@ -153,7 +172,7 @@ private byte[] includedBuckets; /** Whether LLAP IO will be used for inputs. */ - private String llapIoDesc; + private LlapIODescriptor llapIoDesc; private boolean isMergeFromResolver; @@ -295,32 +314,32 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid, hasCacheOnly); } - private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, + private static LlapIODescriptor deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid, boolean hasCacheOnly) { if (!isLlapOn) { - return null; // LLAP IO is off, don't output. + return LlapIODescriptor.DISABLED; // LLAP IO is off, don't output. } if (!canWrapAny && !hasCacheOnly) { - return "no inputs"; // Cannot use with input formats. + return LlapIODescriptor.NO_INPUTS; //"no inputs"; // Cannot use with input formats. } if (!hasPathToPartInfo) { - return "unknown"; // No information to judge. + return LlapIODescriptor.UNKNOWN; //"unknown"; // No information to judge. } int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0); if (varieties > 1) { - return "some inputs"; // Will probably never actually happen. + return LlapIODescriptor.SOME_INPUTS; //"some inputs"; // Will probably never actually happen. } if (hasAcid) { - return "may be used (ACID table)"; + return LlapIODescriptor.ACID; //"may be used (ACID table)"; } if (hasLlap) { - return "all inputs"; + return LlapIODescriptor.ALL_INPUTS; } if (hasCacheOnly) { - return "all inputs (cache only)"; + return LlapIODescriptor.CACHE_ONLY; } - return "no inputs"; + return LlapIODescriptor.NO_INPUTS; } public void internTable(Interner interner) { @@ -370,11 +389,15 @@ public void setAliasToWork( } @Explain(displayName = "LLAP IO", vectorization = Vectorization.SUMMARY_PATH) - public String getLlapIoDesc() { - return llapIoDesc; + public String getLlapIoDescString() { + return llapIoDesc.desc; + } + + public boolean getCacheAffinity() { + return llapIoDesc.cached; } - public void setNameToSplitSample(HashMap nameToSplitSample) { + public void setNameToSplitSample(HashMap nameToSplitSample) { this.nameToSplitSample = nameToSplitSample; }