commit 772e4be5bab179623fbaf189f57a7b6742ddb371 Author: Bharathkrishna Guruvayoor Murali Date: Wed Sep 12 17:50:29 2018 -0700 HIVE-19814: RPC Server port is always random for spark (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) (cherry picked from commit a3b7a2452bacf6d7eeeb42bd9dd68109c90e27a2) Resolved Conflicts: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 6e9ba7c6df..2554e356a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -188,13 +189,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex LOG.info(String.format( "Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } - if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); sparkConf.put(propertyName, value); - LOG.info(String.format( - "load RPC property from hive configuration (%s -> %s).", - propertyName, LogUtils.maskIfPassword(propertyName,value))); + LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, + LogUtils.maskIfPassword(propertyName, value))); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 489383baee..c4ce5d3d64 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -17,16 +17,23 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.spark.client.SparkClientFactory; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; import org.junit.Test; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +104,38 @@ public void testMultiSessionMultipleUse() throws Exception { sessionManagerHS2.shutdown(); } + @Test + public void testConfigsForInitialization() { + //Test to make sure that configs listed in RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed + // through HiveConf are included in the Spark configuration. + HiveConf hiveConf = getHiveConf(); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, "test-rpc-server-address"); + Map sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + assertEquals("49152-49222,49223,49224-49333", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname)); + assertEquals("test-rpc-server-address", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname)); + } + + @Test + public void testServerPortAssignment() throws Exception { + HiveConf conf = getHiveConf(); + conf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + SparkSessionManagerImpl testSessionManager = SparkSessionManagerImpl.getInstance(); + testSessionManager.setup(conf); + + assertTrue("Port should be within configured port range:" + SparkClientFactory.getServerPort(), + SparkClientFactory.getServerPort() >= 49152 && SparkClientFactory.getServerPort() <= 49333); + + //Verify that new spark session can be created to ensure that new SparkSession + // is successfully able to connect to the RpcServer with custom port. + try { + testSessionManager.getSession(null, conf, true); + } catch (HiveException e) { + Assert.fail("Failed test to connect to the RpcServer with custom port"); + } + + testSessionManager.shutdown(); + } /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable { @@ -134,4 +173,12 @@ public void run() { } } } + + private HiveConf getHiveConf() { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionManagerImpl-local-dir").toString()); + return conf; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index b23ff2dae8..e6a5bd5678 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; @@ -80,4 +81,9 @@ public static synchronized SparkClient createClient(Map sparkCon return new SparkClientImpl(server, sparkConf, hiveConf); } + @VisibleForTesting + public static int getServerPort() { + return server.getPort(); + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 8c5901548f..6536023062 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -45,13 +45,14 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( - HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,