diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8724930..cc17c73 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2844,7 +2844,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to generate consistent split locations when generating splits in the AM"), HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"), - + HIVE_PREWARM_SPARK_TIMEOUT("hive.prewarm.spark.timeout", "5000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Time to wait to finish prewarming spark executors"), HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none", new StringSet("none", "idonly", "traverse", "execution"), ""), HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false, ""), diff --git data/conf/spark/standalone/hive-site.xml data/conf/spark/standalone/hive-site.xml index a3b52c7..f97cd38 100644 --- data/conf/spark/standalone/hive-site.xml +++ data/conf/spark/standalone/hive-site.xml @@ -205,6 +205,16 @@ + hive.prewarm.enabled + true + + + + hive.prewarm.spark.timeout + 4min + + + spark.serializer org.apache.spark.serializer.KryoSerializer diff --git data/conf/spark/yarn-client/hive-site.xml data/conf/spark/yarn-client/hive-site.xml index 9cda40d..1d12808 100644 --- data/conf/spark/yarn-client/hive-site.xml +++ data/conf/spark/yarn-client/hive-site.xml @@ -260,6 +260,16 @@ + hive.prewarm.enabled + true + + + + hive.prewarm.spark.timeout + 4min + + + spark.testing true diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index f6f35d3..b367732 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1116,7 +1116,7 @@ public String cliInit(String tname, boolean recreate) throws Exception { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); Utilities.clearWorkMap(conf); - CliSessionState ss = createSessionState(); + CliSessionState ss = new CliSessionState(conf); assert ss != null; ss.in = System.in; @@ -1176,33 +1176,6 @@ public String cliInit(String tname, boolean recreate) throws Exception { return outf.getAbsolutePath(); } - private CliSessionState createSessionState() { - return new CliSessionState(conf) { - @Override - public void setSparkSession(SparkSession sparkSession) { - super.setSparkSession(sparkSession); - if (sparkSession != null) { - try { - // Wait a little for cluster to init, at most 4 minutes - long endTime = System.currentTimeMillis() + 240000; - int expectedCores = conf.getInt("spark.executor.instances", 1) * 2; - while (sparkSession.getMemoryAndCores().getSecond() < expectedCores) { - if (System.currentTimeMillis() >= endTime) { - String msg = "Timed out waiting for Spark cluster to init"; - throw new IllegalStateException(msg); - } - Thread.sleep(100); - } - } catch (Exception e) { - String msg = "Error trying to obtain executor info: " + e; - LOG.error(msg, e); - throw new IllegalStateException(msg, e); - } - } - } - }; - } - private CliSessionState startSessionState(boolean canReuseSession) throws IOException { @@ -1211,7 +1184,7 @@ private CliSessionState startSessionState(boolean canReuseSession) String execEngine = conf.get("hive.execution.engine"); conf.set("hive.execution.engine", "mr"); - CliSessionState ss = createSessionState(); + CliSessionState ss = new CliSessionState(conf); assert ss != null; ss.in = System.in; ss.out = System.out; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 78d5ff2..6faec83 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -76,7 +76,6 @@ private static final String MR_JAR_PROPERTY = "tmpjars"; private static final String MR_CREDENTIALS_LOCATION_PROPERTY = "mapreduce.job.credentials.binary"; private static final transient Logger LOG = LoggerFactory.getLogger(RemoteHiveSparkClient.class); - private static final long MAX_PREWARM_TIME = 5000; // 5s private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); private transient Map conf; @@ -110,12 +109,14 @@ private void createRemoteClient() throws Exception { LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors); - // Spend at most MAX_PREWARM_TIME to wait for executors to come up. + // Spend at most HIVE_PREWARM_SPARK_TIMEOUT to wait for executors to come up. int curExecutors = 0; + long maxPrewarmTime = HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_PREWARM_SPARK_TIMEOUT, + TimeUnit.MILLISECONDS); long ts = System.currentTimeMillis(); do { try { - curExecutors = getExecutorCount(MAX_PREWARM_TIME, TimeUnit.MILLISECONDS); + curExecutors = getExecutorCount(maxPrewarmTime, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // let's don't fail on future timeout since we have a timeout for pre-warm LOG.warn("Timed out getting executor count.", e); @@ -125,9 +126,9 @@ private void createRemoteClient() throws Exception { return; } Thread.sleep(500); // sleep half a second - } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME); + } while (System.currentTimeMillis() - ts < maxPrewarmTime); - LOG.info("Timeout (" + MAX_PREWARM_TIME / 1000 + "s) occurred while prewarming executors. " + + LOG.info("Timeout (" + maxPrewarmTime / 1000 + "s) occurred while prewarming executors. " + "The current number of executors is " + curExecutors); } }