diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9cc7987..56a9d82 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1119,6 +1119,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVETEZCONTAINERSIZE("hive.tez.container.size", -1, "By default Tez will spawn containers of the size of a mapper. This can be used to overwrite."), + HIVETEZCONTAINERSIZE_RESOURCE_CALCULATOR("hive.tez.container.resource.calculator", "", + "Class name implementing org.apache.hadoop.hive.ql.exec.tez.ResourceCalculator, " + + "which is used to adjust required resource for tez task."), + HIVETEZCPUVCORES("hive.tez.cpu.vcores", -1, "By default Tez will ask for however many cpus map-reduce is configured to use per container.\n" + "This can be used to overwrite."), diff --git a/common/src/java/org/apache/hive/common/util/ReflectionUtil.java b/common/src/java/org/apache/hive/common/util/ReflectionUtil.java index 05aebc2..dbd11fc 100644 --- a/common/src/java/org/apache/hive/common/util/ReflectionUtil.java +++ b/common/src/java/org/apache/hive/common/util/ReflectionUtil.java @@ -27,6 +27,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.hive.common.JavaUtils; /** * Same as Hadoop ReflectionUtils, but (1) does not leak classloaders (or shouldn't anyway, we @@ -64,6 +65,15 @@ configureMethod = configureMethodLocal; } + @SuppressWarnings("unchecked") + public static T newInstance(String className, Configuration conf) { + try { + return (T)newInstance(JavaUtils.loadClass(className), conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + /** * Create an object for the given class and initialize it from conf * @param theClass class of which an object is created diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f773cb9..34b20e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hive.common.util.ReflectionUtil; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -421,7 +422,7 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration * from yarn. Falls back to Map-reduce's map size if tez * container size isn't set. */ - public static Resource getContainerResource(Configuration conf) { + private Resource getContainerResource(Configuration conf) { int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -431,6 +432,19 @@ public static Resource getContainerResource(Configuration conf) { return Resource.newInstance(memory, cpus); } + private ResourceCalculator getAllocator(Configuration conf) { + String className = HiveConf.getVar( + conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE_RESOURCE_CALCULATOR); + if (className != null && !className.isEmpty()) { + try { + return ReflectionUtil.newInstance(className, conf); + } catch (Exception e) { + LOG.warn("Failed to instantiate resource calculator " + className); + } + } + return new ResourceCalculator.DefaultCalculator(); + } + /* * Helper to setup default environment for a task in YARN. */ @@ -619,13 +633,16 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Utilities.setMapWork(conf, mapWork, mrScratchDir, false); } - UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf); String procClassName = MapTezProcessor.class.getName(); if (mapWork instanceof MergeFileWork) { procClassName = MergeFileTezProcessor.class.getName(); } + + Resource resource = getAllocator(conf).adjust(getContainerResource(conf), mapWork); + LOG.info(mapWork.getName() + " requests resource " + resource.toString()); + map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) - .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); + .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), numTasks, resource); map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); @@ -680,11 +697,14 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, Utilities.createTmpDirs(conf, reduceWork); // create the vertex + Resource resource = getAllocator(conf).adjust(getContainerResource(conf), reduceWork); + LOG.info(reduceWork.getName() + " requests resource " + resource.toString()); + Vertex reducer = Vertex.create(reduceWork.getName(), ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). setUserPayload(TezUtils.createUserPayloadFromConf(conf)), reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork - .getNumReduceTasks(), getContainerResource(conf)); + .getNumReduceTasks(), resource); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); @@ -736,16 +756,12 @@ public PreWarmVertex createPreWarmVertex(TezConfiguration conf, int numContainers, Map localResources) throws IOException, TezException { + Resource containerResource = getContainerResource(conf); ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName()); prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf)); - - Map combinedResources = new HashMap(); - - if (localResources != null) { - combinedResources.putAll(localResources); - } + PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", + prewarmProcDescriptor, numContainers, containerResource); prewarmVertex.addTaskLocalFiles(localResources); prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java new file mode 100644 index 0000000..71678a0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.yarn.api.records.Resource; + +// adjusts container resource for tez tasks +public interface ResourceCalculator { + + Resource adjust(Resource resource, MapWork mapWork); + + Resource adjust(Resource resource, ReduceWork reduceWork); + + // dummy implementation + class DefaultCalculator implements ResourceCalculator { + + @Override + public Resource adjust(Resource resource, MapWork mapWork) { + return resource; + } + + @Override + public Resource adjust(Resource resource, ReduceWork reduceWork) { + return resource; + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java new file mode 100644 index 0000000..ac55d58 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.PTFOperator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Collection; + +// example +public class SimpleCalculator implements ResourceCalculator { + + @Override + public Resource adjust(Resource resource, MapWork mapWork) { + if (useLightContainer(mapWork.getAllRootOperators())) { + resource.setMemory(resource.getMemory() / 2); + } + return resource; + } + + @Override + public Resource adjust(Resource resource, ReduceWork reduceWork) { + if (useLightContainer(reduceWork.getAllRootOperators())) { + resource.setMemory(resource.getMemory() / 2); + } + return resource; + } + + private boolean useLightContainer(Collection> operators) { + return + OperatorUtils.findOperators(operators, GroupByOperator.class).isEmpty() && + OperatorUtils.findOperators(operators, CommonJoinOperator.class).isEmpty() && + OperatorUtils.findOperators(operators, PTFOperator.class).isEmpty(); + } +}