diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0fcd39b..a2238b4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1182,6 +1182,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVETEZCONTAINERSIZE("hive.tez.container.size", -1, "By default Tez will spawn containers of the size of a mapper. This can be used to overwrite."), + HIVETEZCONTAINERSIZE_RESOURCE_CALCULATOR("hive.tez.container.resource.calculator", "", + "Class name implementing org.apache.hadoop.hive.ql.exec.tez.ResourceCalculator, " + + "which is used to adjust required resource for tez task."), + HIVETEZCPUVCORES("hive.tez.cpu.vcores", -1, "By default Tez will ask for however many cpus map-reduce is configured to use per container.\n" + "This can be used to overwrite."), diff --git a/common/src/java/org/apache/hive/common/util/ReflectionUtil.java b/common/src/java/org/apache/hive/common/util/ReflectionUtil.java index 7059309..efd131d 100644 --- a/common/src/java/org/apache/hive/common/util/ReflectionUtil.java +++ b/common/src/java/org/apache/hive/common/util/ReflectionUtil.java @@ -27,6 +27,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.hive.common.JavaUtils; /** * Same as Hadoop ReflectionUtils, but (1) does not leak classloaders (or shouldn't anyway, we @@ -64,6 +65,15 @@ configureMethod = configureMethodLocal; } + @SuppressWarnings("unchecked") + public static T newInstance(String className, Configuration conf) { + try { + return (T)newInstance(JavaUtils.loadClass(className), conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + /** * Create an object for the given class and initialize it from conf * @param theClass class of which an object is created diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d250b82..05f1192 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hive.common.util.ReflectionUtil; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -182,27 +183,27 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork if (mapWork.getNumMapTasks() != null) { // Is this required ? - conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue()); + conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks()); } if (mapWork.getMaxSplitSize() != null) { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, - mapWork.getMaxSplitSize().longValue()); + mapWork.getMaxSplitSize()); } if (mapWork.getMinSplitSize() != null) { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, - mapWork.getMinSplitSize().longValue()); + mapWork.getMinSplitSize()); } if (mapWork.getMinSplitSizePerNode() != null) { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, - mapWork.getMinSplitSizePerNode().longValue()); + mapWork.getMinSplitSizePerNode()); } if (mapWork.getMinSplitSizePerRack() != null) { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, - mapWork.getMinSplitSizePerRack().longValue()); + mapWork.getMinSplitSizePerRack()); } Utilities.setInputAttributes(conf, mapWork); @@ -423,7 +424,7 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration * from yarn. Falls back to Map-reduce's map size if tez * container size isn't set. */ - public static Resource getContainerResource(Configuration conf) { + private Resource getContainerResource(Configuration conf) { int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -433,6 +434,19 @@ public static Resource getContainerResource(Configuration conf) { return Resource.newInstance(memory, cpus); } + private ResourceCalculator getAllocator(Configuration conf) { + String className = HiveConf.getVar( + conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE_RESOURCE_CALCULATOR); + if (className != null && !className.isEmpty()) { + try { + return ReflectionUtil.newInstance(className, conf); + } catch (Exception e) { + LOG.warn("Failed to instantiate resource calculator " + className); + } + } + return new ResourceCalculator.DefaultCalculator(); + } + /* * Helper to setup default environment for a task in YARN. */ @@ -563,11 +577,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } else { // we'll set up tez to combine spits for us iff the input format // is HiveInputFormat - if (inputFormatClass == HiveInputFormat.class) { - groupSplitsInInputInitializer = true; - } else { - groupSplitsInInputInitializer = false; - } + groupSplitsInInputInitializer = inputFormatClass == HiveInputFormat.class; } if (mapWork instanceof MergeFileWork) { @@ -621,16 +631,17 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Utilities.setMapWork(conf, mapWork, mrScratchDir, false); } - UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf); String procClassName = MapTezProcessor.class.getName(); if (mapWork instanceof MergeFileWork) { procClassName = MergeFileTezProcessor.class.getName(); } VertexExecutionContext executionContext = createVertexExecutionContext(mapWork); + Resource resource = getAllocator(conf).adjust(getContainerResource(conf), mapWork); + LOG.info(mapWork.getName() + " requests resource " + resource.toString()); map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) - .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); + .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), numTasks, resource); map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setExecutionContext(executionContext); @@ -701,12 +712,15 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork); // create the vertex + Resource resource = getAllocator(conf).adjust(getContainerResource(conf), reduceWork); + LOG.info(reduceWork.getName() + " requests resource " + resource.toString()); + Vertex reducer = Vertex.create(reduceWork.getName(), ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). setUserPayload(TezUtils.createUserPayloadFromConf(conf)), reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : - reduceWork.getNumReduceTasks(), getContainerResource(conf)); + reduceWork.getNumReduceTasks(), resource); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setExecutionContext(vertexExecutionContext); @@ -726,14 +740,9 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, * Helper method to create a yarn local resource. */ private LocalResource createLocalResource(FileSystem remoteFs, Path file, - LocalResourceType type, LocalResourceVisibility visibility) { + LocalResourceType type, LocalResourceVisibility visibility) throws IOException { - FileStatus fstat = null; - try { - fstat = remoteFs.getFileStatus(file); - } catch (IOException e) { - e.printStackTrace(); - } + FileStatus fstat = remoteFs.getFileStatus(file); URL resourceURL = ConverterUtils.getYarnUrlFromPath(file); long resourceSize = fstat.getLen(); @@ -759,16 +768,13 @@ public PreWarmVertex createPreWarmVertex(TezConfiguration conf, int numContainers, Map localResources) throws IOException, TezException { - ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName()); + Resource containerResource = getContainerResource(conf); + ProcessorDescriptor prewarmProcDescriptor = + ProcessorDescriptor.create(HivePreWarmProcessor.class.getName()); prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf)); - - Map combinedResources = new HashMap(); - - if (localResources != null) { - combinedResources.putAll(localResources); - } + PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", + prewarmProcDescriptor, numContainers, containerResource); prewarmVertex.addTaskLocalFiles(localResources); prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java new file mode 100644 index 0000000..71678a0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ResourceCalculator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.yarn.api.records.Resource; + +// adjusts container resource for tez tasks +public interface ResourceCalculator { + + Resource adjust(Resource resource, MapWork mapWork); + + Resource adjust(Resource resource, ReduceWork reduceWork); + + // dummy implementation + class DefaultCalculator implements ResourceCalculator { + + @Override + public Resource adjust(Resource resource, MapWork mapWork) { + return resource; + } + + @Override + public Resource adjust(Resource resource, ReduceWork reduceWork) { + return resource; + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java new file mode 100644 index 0000000..ac55d58 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SimpleCalculator.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.PTFOperator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Collection; + +// example +public class SimpleCalculator implements ResourceCalculator { + + @Override + public Resource adjust(Resource resource, MapWork mapWork) { + if (useLightContainer(mapWork.getAllRootOperators())) { + resource.setMemory(resource.getMemory() / 2); + } + return resource; + } + + @Override + public Resource adjust(Resource resource, ReduceWork reduceWork) { + if (useLightContainer(reduceWork.getAllRootOperators())) { + resource.setMemory(resource.getMemory() / 2); + } + return resource; + } + + private boolean useLightContainer(Collection> operators) { + return + OperatorUtils.findOperators(operators, GroupByOperator.class).isEmpty() && + OperatorUtils.findOperators(operators, CommonJoinOperator.class).isEmpty() && + OperatorUtils.findOperators(operators, PTFOperator.class).isEmpty(); + } +}