diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cc233f7..72022ad 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2831,6 +2831,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes\n" + "and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as\n" + "necessary."), + TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR("hive.tez.llap.min.reducer.per.executor", 0.95f, + "If above 0, the min number of reducers for auto-parallelism for LLAP scheduling will\n" + + "be set to this fraction of the number of executors."), TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f, "When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges."), TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f, diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 610c0a5..76fc9c7 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -61,7 +61,7 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. String appName = hosts.substring(1); - String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, currentUser()); String key = appName + "-" + userName; registry = yarnRegistries.get(key); if (registry == null || !registry.isInState(STATE.STARTED)) { @@ -79,6 +79,9 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { return registry; } + public static String currentUser() { + return RegistryUtils.currentUser(); + } @Override public void serviceInit(Configuration conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java index 60a8604..b51af55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED; /** * SetReducerParallelism determines how many reducers should @@ -106,6 +107,7 @@ public Object process(Node nd, Stack stack, } } else { LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); + desc.setReducerTraits(EnumSet.of(FIXED)); // usually controlled by bucketing } return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java new file mode 100644 index 0000000..323c64d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.concurrent.ExecutionException; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapClusterStateForCompile { + protected static final Logger LOG = LoggerFactory.getLogger(LlapClusterStateForCompile.class); + + private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 2 minutes. + private Long lastClusterUpdateNs; + private Integer noConfigNodeCount, executorCount; + private final Object svcInitLock = new Object(); + private LlapRegistryService svc; + private final Configuration conf; + + // It's difficult to impossible to pass global things to compilation, so we have a static cache. + private static final Cache CACHE = + CacheBuilder.newBuilder().initialCapacity(10).maximumSize(100).build(); + + public static LlapClusterStateForCompile getClusterInfo(final Configuration conf) { + final String nodes = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + final String userName = HiveConf.getVar( + conf, ConfVars.LLAP_ZK_REGISTRY_USER, LlapRegistryService.currentUser()); + Callable generator = new Callable() { + @Override + public LlapClusterStateForCompile call() throws Exception { + LOG.info("Creating cluster info for " + userName + ":" + nodes); + return new LlapClusterStateForCompile(conf); + } + }; + try { + return CACHE.get(userName + ":" + nodes, generator); + } catch (ExecutionException e) { + throw new RuntimeException(e); // Should never happen... ctor is just assignments. + } + } + + private LlapClusterStateForCompile(Configuration conf) { + this.conf = conf; + } + + public boolean hasClusterInfo() { + return lastClusterUpdateNs != null; + } + + public int getKnownExecutorCount() { + return executorCount; + } + + public int getNodeCountWithUnknownExecutors() { + return noConfigNodeCount; + } + + public void initClusterInfo() { + if (lastClusterUpdateNs != null) { + long elapsed = System.nanoTime() - lastClusterUpdateNs; + if (elapsed < CLUSTER_UPDATE_INTERVAL_NS) return; + } + synchronized (svcInitLock) { + if (svc == null) { + try { + svc = LlapRegistryService.getClient(conf); + } catch (Throwable t) { + LOG.info("Cannot create the client; ignoring", t); + return; // Don't fail; this is best-effort. + } + } + } + ServiceInstanceSet instances; + try { + instances = svc.getInstances(10); + } catch (IOException e) { + LOG.info("Cannot update cluster information; ignoring", e); + return; // Don't wait for the cluster if not started; this is best-effort. + } + int executorsLocal = 0, noConfigNodesLocal = 0; + for (ServiceInstance si : instances.getAll()) { + if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen in getAll. + Map props = si.getProperties(); + if (props == null) { + ++noConfigNodesLocal; + continue; + } + try { + executorsLocal += Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + } catch (NumberFormatException e) { + ++noConfigNodesLocal; + } + } + lastClusterUpdateNs = System.nanoTime(); + noConfigNodeCount = noConfigNodesLocal; + executorCount = executorsLocal; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index a694cf8..c81131e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Deque; import java.util.EnumSet; @@ -51,7 +50,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -106,12 +104,20 @@ } private LlapMode mode; + private final LlapClusterStateForCompile clusterState; + + public LlapDecider(LlapClusterStateForCompile clusterState) { + this.clusterState = clusterState; + } + class LlapDecisionDispatcher implements Dispatcher { private final HiveConf conf; private final boolean doSkipUdfCheck; private final boolean arePermanentFnsAllowed; private final boolean shouldUber; + private final float minReducersPerExec; + private final int executorsPerNode; private List mapJoinOpList; private final Map rules; @@ -121,6 +127,9 @@ public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) { arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOW_PERMANENT_FNS); // Don't user uber in "all" mode - everything can go into LLAP, which is better than uber. shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && (mode != all); + minReducersPerExec = HiveConf.getFloatVar( + conf, ConfVars.TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR); + executorsPerNode = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); // TODO# hmm mapJoinOpList = new ArrayList(); rules = getRules(); } @@ -139,22 +148,57 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) return null; } - private void handleWork(TezWork tezWork, BaseWork work) - throws SemanticException { + private void handleWork(TezWork tezWork, BaseWork work) throws SemanticException { boolean workCanBeDoneInLlap = evaluateWork(tezWork, work); LOG.debug( "Work " + work + " " + (workCanBeDoneInLlap ? "can" : "cannot") + " be done in LLAP"); if (workCanBeDoneInLlap) { for (MapJoinOperator graceMapJoinOp : mapJoinOpList) { - LOG.debug( - "Disabling hybrid grace hash join in case of LLAP and non-dynamic partition hash join."); + LOG.debug("Disabling hybrid grace hash join in case of LLAP " + + "and non-dynamic partition hash join."); graceMapJoinOp.getConf().setHybridHashJoin(false); } + adjustAutoParallelism(work); + convertWork(tezWork, work); } mapJoinOpList.clear(); } + private void adjustAutoParallelism(BaseWork work) { + if (minReducersPerExec <= 0 || !(work instanceof ReduceWork)) return; + ReduceWork reduceWork = (ReduceWork)work; + if (reduceWork.isAutoReduceParallelism() == false && reduceWork.isUniformDistribution() == false) { + return; // Not based on ARP and cannot assume uniform distribution, bail. + } + clusterState.initClusterInfo(); + int targetCount = 0; + if (!clusterState.hasClusterInfo()) { + LOG.warn("Cannot determine LLAP cluster information"); + targetCount = (int)Math.ceil(minReducersPerExec * 1 * executorsPerNode); + } else { + targetCount = (int)Math.ceil(minReducersPerExec * (clusterState.getKnownExecutorCount() + + clusterState.getNodeCountWithUnknownExecutors() * executorsPerNode)); + } + // We only increase the targets here. + if (reduceWork.isAutoReduceParallelism()) { + int newMin = Math.max(reduceWork.getMinReduceTasks(), targetCount); + if (newMin < reduceWork.getMaxReduceTasks()) { + reduceWork.setMinReduceTasks(newMin); + reduceWork.getEdgePropRef().setAutoReduce(conf, true, newMin, + reduceWork.getMaxReduceTasks(), conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER)); + } else { + reduceWork.setAutoReduceParallelism(false); + reduceWork.setNumReduceTasks(newMin); + // TODO: is this correct? based on the same logic as HIVE-14200 + reduceWork.getEdgePropRef().setAutoReduce(null, false, 0, 0, 0); + } + } else { + // UNIFORM || AUTOPARALLEL (maxed out) + reduceWork.setNumReduceTasks(Math.max(reduceWork.getNumReduceTasks(), targetCount)); + } + } + private void convertWork(TezWork tezWork, BaseWork work) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index d58f447..257fab3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import java.util.*; @@ -96,6 +97,7 @@ public static ReduceWork createReduceWork( reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); reduceWork.setSlowStart(reduceSink.getConf().isSlowStart()); + reduceWork.setUniformDistribution(reduceSink.getConf().getReducerTraits().contains(UNIFORM)); if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) { @@ -103,6 +105,7 @@ public static ReduceWork createReduceWork( final int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); // estimated number of reducers final int nReducers = reduceSink.getConf().getNumReducers(); + // TODO# HERE // min we allow tez to pick int minPartition = Math.max(1, (int) (nReducers * minPartitionFactor)); @@ -139,6 +142,7 @@ public static ReduceWork createReduceWork( edgeProp = new TezEdgeProperty(edgeType); edgeProp.setSlowStart(reduceWork.isSlowStart()); } + reduceWork.setEdgePropRef(edgeProp); tezWork.connect( context.preceedingWork, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 62bd652..d38f715 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; + import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.*; @@ -563,7 +565,8 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } if ("llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { - physicalCtx = new LlapDecider().resolve(physicalCtx); + LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); + physicalCtx = new LlapDecider(llapInfo).resolve(physicalCtx); } else { LOG.debug("Skipping llap decider"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index dfed017..7f65074 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -85,6 +85,8 @@ public ReduceWork(String name) { // boolean that says whether tez auto reduce parallelism should be used private boolean isAutoReduceParallelism; + // boolean that says whether the data distribution is uniform hash (not java HashCode) + private transient boolean isUniformDistribution = false; // boolean that says whether to slow start or not private boolean isSlowStart = true; @@ -101,6 +103,8 @@ public ReduceWork(String name) { private boolean reduceVectorizationEnabled; private String vectorReduceEngine; + private transient TezEdgeProperty edgeProp; + /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing * to keySerializeInfo of the ReduceSink @@ -228,6 +232,15 @@ public void setSlowStart(boolean isSlowStart) { this.isSlowStart = isSlowStart; } + // ReducerTraits.UNIFORM + public void setUniformDistribution(boolean isUniformDistribution) { + this.isUniformDistribution = isUniformDistribution; + } + + public boolean isUniformDistribution() { + return this.isUniformDistribution; + } + public void setMinReduceTasks(int minReduceTasks) { this.minReduceTasks = minReduceTasks; } @@ -320,4 +333,12 @@ public ReduceExplainVectorization getReduceExplainVectorization() { } return new ReduceExplainVectorization(this); } + + public void setEdgePropRef(TezEdgeProperty edgeProp) { + this.edgeProp = edgeProp; + } + + public TezEdgeProperty getEdgePropRef() { + return edgeProp; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index d87bee3..5d7ddc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -50,6 +50,12 @@ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, boolean isSlowStart, int minReducer, int maxReducer, long bytesPerReducer) { this(hiveConf, edgeType, -1); + setAutoReduce(hiveConf, isAutoReduce, minReducer, maxReducer, bytesPerReducer); + } + + public void setAutoReduce(HiveConf hiveConf, boolean isAutoReduce, int minReducer, + int maxReducer, long bytesPerReducer) { + this.hiveConf = hiveConf; this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce;