commit 9fd281a180944e493dc2cb7db81db71dbc539896 Author: Bharath Krishna Date: Mon Sep 10 20:48:58 2018 -0700 HIVE-19814 : RPC Server port is always random for spark diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 0aae0d8205..a49e72d327 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -198,7 +199,12 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sp LOG.debug(String.format( "Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } - + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { + String value = RpcConfiguration.getValue(hiveConf, propertyName); + sparkConf.put(propertyName, value); + LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, + LogUtils.maskIfPassword(propertyName, value))); + } } final boolean optShuffleSerDe = hiveConf.getBoolVar( diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 853e4f48c8..3882b58ba7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.spark.client.SparkClientFactory; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -31,6 +33,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -214,6 +217,38 @@ public void testGetSessionId() throws HiveException { assertEquals("0", ss.getSparkSession().getSessionId()); } + @Test + public void testConfigsForInitialization() { + //Test to make sure that configs listed in RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed + // through HiveConf are included in the Spark configuration. + HiveConf hiveConf = getHiveConf(); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, "test-rpc-server-address"); + Map sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); + assertEquals("49152-49222,49223,49224-49333", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname)); + assertEquals("test-rpc-server-address", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname)); + } + + @Test + public void testServerPortAssignment() throws Exception { + HiveConf conf = getHiveConf(); + conf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + SparkSessionManagerImpl testSessionManager = SparkSessionManagerImpl.getInstance(); + testSessionManager.setup(conf); + + assertTrue("Port should be within configured port range:" + SparkClientFactory.getServerPort(), + SparkClientFactory.getServerPort() >= 49152 && SparkClientFactory.getServerPort() <= 49333); + + //Verify that new spark session can be created to ensure that new SparkSession + // is successfully able to connect to the RpcServer with custom port. + try { + testSessionManager.getSession(null, conf, true); + } catch (HiveException e) { + Assert.fail("Failed test to connect to the RpcServer with custom port"); + } + + testSessionManager.shutdown(); + } private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 54ecdf08e1..640d058587 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; @@ -94,4 +95,9 @@ public static SparkClient createClient(Map sparkConf, HiveConf h HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); } } + + @VisibleForTesting + public static int getServerPort() { + return server.getPort(); + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index bd3a7a7321..eb824efdfb 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,14 +46,15 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( - HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,