diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1fea721..b3f2a39 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2022,7 +2022,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Check input size, before considering vertex (-1 disables check)"), LLAP_AUTO_MAX_OUTPUT("hive.llap.auto.max.output.size", 1*1024*1024*1024L, "Check output size, before considering vertex (-1 disables check)"), - LLAP_EXECUTION_MODE("hive.llap.execution.mode", "auto", + LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none", new StringSet("auto", "none", "all", "map"), "Chooses whether query fragments will run in container or in llap"), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c9029f2..010316f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -623,8 +623,13 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, if (mapWork instanceof MergeFileWork) { procClassName = MergeFileTezProcessor.class.getName(); } + + String serviceName = findServiceName(mapWork); + map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) - .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); + .setUserPayload(serializedConf), numTasks, getContainerResource(conf)) + .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) + .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName); map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); @@ -664,6 +669,13 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWo return conf; } + private String findServiceName(BaseWork work) { + String serviceName = TezSessionState.DEFAULT_SERVICE; + if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE; + if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE; + return serviceName; + } + /* * Helper function to create Vertex for given ReduceWork. */ @@ -678,12 +690,17 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); + String serviceName = findServiceName(reduceWork); + // create the vertex Vertex reducer = Vertex.create(reduceWork.getName(), ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). setUserPayload(TezUtils.createUserPayloadFromConf(conf)), - reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork - .getNumReduceTasks(), getContainerResource(conf)); + reduceWork.isAutoReduceParallelism()? + reduceWork.getMaxReduceTasks(): + reduceWork.getNumReduceTasks(), getContainerResource(conf)) + .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) + .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 89286e5..359baf8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -56,6 +56,8 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; /** * Holds session state related to Tez @@ -64,6 +66,13 @@ private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; + public static final String LLAP_SERVICE = "LLAP"; + public static final String DEFAULT_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; + public static final String LOCAL_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT; + private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.DaemonTaskSchedulerService"; + private static final String LLAP_LAUNCHER = "org.apache.tez.dag.app.launcher.DaemonContainerLauncher"; + private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + LLAP_SCHEDULER; + private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + LLAP_LAUNCHER; private HiveConf conf; private Path tezScratchDir; @@ -171,8 +180,18 @@ public void open(HiveConf conf, String[] additionalFiles) // and finally we're ready to create and start the session // generate basic tez config TezConfiguration tezConfig = new TezConfiguration(conf); + + // set up the staging directory to use tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + // we need plugins to handle llap and uber mode + tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_SCHEDULER); + + tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, + DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER); + + // container prewarming. tell the am how many containers we need if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); n = Math.max(tezConfig.getInt( diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index e1aed9c..faeb5ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -62,6 +62,7 @@ public BaseWork(String name) { protected Map> allColumnVectorMaps = null; protected boolean llapMode = false; + protected boolean uberMode = false; protected boolean vectorMode = false; public void setGatheringStats(boolean gatherStats) { @@ -186,6 +187,14 @@ public boolean getVectorMode() { return vectorMode; } + public void setUberMode(boolean uberMode) { + this.uberMode = uberMode; + } + + public boolean getUberMode() { + return uberMode; + } + public void setLlapMode(boolean llapMode) { this.llapMode = llapMode; } diff --git ql/src/test/queries/clientpositive/llapdecider.q ql/src/test/queries/clientpositive/llapdecider.q index e8e8770..87a8e19 100644 --- ql/src/test/queries/clientpositive/llapdecider.q +++ ql/src/test/queries/clientpositive/llapdecider.q @@ -1,4 +1,5 @@ set hive.stats.fetch.column.stats=true; +set hive.llap.execution.mode=auto; -- simple query with multiple reduce stages EXPLAIN SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt; @@ -50,10 +51,10 @@ set hive.llap.execution.mode=all; EXPLAIN SELECT * from src_orc s1 join src_orc s2 on (s1.key = s2.key) order by s2.value; -set hive.llap.execution.mode=auto; - CREATE TEMPORARY FUNCTION test_udf_get_java_string AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString'; +set hive.llap.execution.mode=auto; + EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) > 1; EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int) + 1) from src_orc where cast(key as int) > 1; EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(test_udf_get_java_string(cast(key as string)) as int) > 1;