diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 47db0c0..534eb7d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3560,8 +3560,7 @@ private boolean isSparkRelatedConfig(String name) { result = !name.equals("spark.app.name"); } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode. String sparkMaster = get("spark.master"); - if (sparkMaster != null && - (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + if (sparkMaster != null && sparkMaster.startsWith("yarn")) { result = true; } } else if (name.startsWith("hive.spark")) { // Remote Spark Context property. diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java index f88573f..7f6175b 100644 --- a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java +++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java @@ -150,7 +150,7 @@ public void testSparkConfigUpdate(){ HiveConf conf = new HiveConf(); Assert.assertFalse(conf.getSparkConfigUpdated()); - conf.verifyAndSet("spark.master", "yarn-cluster"); + conf.verifyAndSet("spark.master", "yarn"); Assert.assertTrue(conf.getSparkConfigUpdated()); conf.verifyAndSet("hive.execution.engine", "spark"); Assert.assertTrue("Expected spark config updated.", conf.getSparkConfigUpdated()); diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml index 4e63245..9cda40d 100644 --- a/data/conf/spark/yarn-client/hive-site.xml +++ b/data/conf/spark/yarn-client/hive-site.xml @@ -201,7 +201,12 @@ spark.master - yarn-cluster + yarn + + + + spark.submit.deployMode + cluster diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index d71a84c..1e97cd4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.SparkClientUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -52,11 +53,13 @@ protected static final transient Logger LOG = LoggerFactory.getLogger(HiveSparkClientFactory.class); private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; - private static final String SPARK_DEFAULT_MASTER = "yarn-cluster"; + private static final String SPARK_DEFAULT_MASTER = "yarn"; + private static final String SPARK_DEFAULT_DEPLOY_MODE = "cluster"; private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark"; private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false"; private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; + private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { Map sparkConf = initiateSparkConf(hiveconf); @@ -125,10 +128,27 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkMaster = sparkConf.get("spark.master"); hiveConf.set("spark.master", sparkMaster); } + String deployMode = null; + if (!SparkClientUtilities.isLocalMaster(sparkMaster)) { + deployMode = hiveConf.get(SPARK_DEPLOY_MODE); + if (deployMode == null) { + deployMode = sparkConf.get(SPARK_DEPLOY_MODE); + if (deployMode == null) { + deployMode = SparkClientUtilities.getDeployModeFromMaster(sparkMaster); + } + if (deployMode == null) { + deployMode = SPARK_DEFAULT_DEPLOY_MODE; + } + hiveConf.set(SPARK_DEPLOY_MODE, deployMode); + } + } if (SessionState.get() != null && SessionState.get().getConf() != null) { SessionState.get().getConf().set("spark.master", sparkMaster); + if (deployMode != null) { + SessionState.get().getConf().set(SPARK_DEPLOY_MODE, deployMode); + } } - if (sparkMaster.equals("yarn-cluster")) { + if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode)) { sparkConf.put("spark.yarn.maxAppAttempts", "1"); } for (Map.Entry entry : hiveConf) { @@ -140,7 +160,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex "load spark property from hive configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } else if (propertyName.startsWith("yarn") && - (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + SparkClientUtilities.isYarnMaster(sparkMaster)) { String value = hiveConf.get(propertyName); // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties // started with spark prefix, Spark would remove spark.hadoop prefix lately and add @@ -184,7 +204,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex // set yarn queue name final String sparkQueueNameKey = "spark.yarn.queue"; - if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) { + if (SparkClientUtilities.isYarnMaster(sparkMaster) && hiveConf.get(sparkQueueNameKey) == null) { String queueName = hiveConf.get("mapreduce.job.queuename"); if (queueName != null) { sparkConf.put(sparkQueueNameKey, queueName); @@ -192,7 +212,8 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } // Disable it to avoid verbose app state report in yarn-cluster mode - if (sparkMaster.equals("yarn-cluster") && sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) { + if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode) && + sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) { sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a705dfc..6caf2b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -100,7 +100,7 @@ private void createRemoteClient() throws Exception { remoteClient = SparkClientFactory.createClient(conf, hiveConf); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && - hiveConf.get("spark.master").startsWith("yarn-")) { + SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master"))) { int minExecutors = getExecutorsToWarm(); if (minExecutors <= 0) { return; @@ -172,8 +172,10 @@ public int getDefaultParallelism() throws Exception { } @Override - public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { - if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) { + public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) + throws Exception { + if (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) && + !remoteClient.isActive()) { // Re-create the remote client if not active any more close(); createRemoteClient(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 630b598..7d18c0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; +import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.spark.Dependency; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -87,7 +88,10 @@ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { // checks if a resource has to be uploaded to HDFS for yarn-cluster mode public static boolean needUploadToHDFS(URI source, SparkConf sparkConf) { - return sparkConf.get("spark.master").equals("yarn-cluster") && + String master = sparkConf.get("spark.master"); + String deployMode = sparkConf.contains("spark.submit.deployMode") ? + sparkConf.get("spark.submit.deployMode") : null; + return SparkClientUtilities.isYarnClusterMode(master, deployMode) && !source.getScheme().equals("hdfs"); } @@ -102,7 +106,7 @@ private static String getFileName(URI uri) { public static boolean isDedicatedCluster(Configuration conf) { String master = conf.get("spark.master"); - return master.startsWith("yarn-") || master.startsWith("local"); + return SparkClientUtilities.isYarnMaster(master) || SparkClientUtilities.isLocalMaster(master); } public static SparkSession getSparkSession(HiveConf conf, diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5191e1f..0da40dd 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -333,6 +333,7 @@ public void run() { // SparkSubmit will take care of that for us. String master = conf.get("spark.master"); Preconditions.checkArgument(master != null, "spark.master is not defined."); + String deployMode = conf.get("spark.submit.deployMode"); List argv = Lists.newArrayList(); @@ -342,7 +343,9 @@ public void run() { LOG.info("No spark.home provided, calling SparkSubmit directly."); argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) { + if (master.startsWith("local") || master.startsWith("mesos") || + SparkClientUtilities.isYarnClientMode(master, deployMode) || + master.startsWith("spark")) { String mem = conf.get("spark.driver.memory"); if (mem != null) { argv.add("-Xms" + mem); @@ -383,7 +386,7 @@ public void run() { argv.add(keyTabFile); } - if (master.equals("yarn-cluster")) { + if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { String executorCores = conf.get("spark.executor.cores"); if (executorCores != null) { argv.add("--executor-cores"); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 6251861..9ef3f38 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -106,4 +106,33 @@ private static URL urlFromPathString(String path, Long timeStamp, } return url; } + + public static boolean isYarnClusterMode(String master, String deployMode) { + return "yarn-cluster".equals(master) || + ("yarn".equals(master) && "cluster".equals(deployMode)); + } + + public static boolean isYarnClientMode(String master, String deployMode) { + return "yarn-client".equals(master) || + ("yarn".equals(master) && "client".equals(deployMode)); + } + + public static boolean isYarnMaster(String master) { + return master != null && master.startsWith("yarn"); + } + + public static boolean isLocalMaster(String master) { + return master != null && master.startsWith("local"); + } + + public static String getDeployModeFromMaster(String master) { + if (master != null) { + if (master.equals("yarn-client")) { + return "client"; + } else if (master.equals("yarn-cluster")) { + return "cluster"; + } + } + return null; + } }