commit a56791c3086e786caa5ec482104d03f16bafabdf Author: Bharathkrishna Guruvayoor Murali Date: Wed Sep 12 17:50:29 2018 -0700 HIVE-19814: RPC Server port is always random for spark (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) (cherry picked from commit a3b7a2452bacf6d7eeeb42bd9dd68109c90e27a2) Resolved Conflicts: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index c22fb8923d..c6a27fa12d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -198,13 +199,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se LOG.debug(String.format( "Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } - if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); sparkConf.put(propertyName, value); - LOG.debug(String.format( - "load RPC property from hive configuration (%s -> %s).", - propertyName, LogUtils.maskIfPassword(propertyName,value))); + LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, + LogUtils.maskIfPassword(propertyName, value))); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index fe95ce0a85..52263b1a59 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -18,14 +18,18 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hive.spark.client.SparkClientFactory; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; import org.junit.Test; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -187,6 +191,38 @@ public void testGetHiveException() throws Exception { checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR); } + @Test + public void testConfigsForInitialization() { + //Test to make sure that configs listed in RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed + // through HiveConf are included in the Spark configuration. + HiveConf hiveConf = getHiveConf(); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, "test-rpc-server-address"); + Map sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); + assertEquals("49152-49222,49223,49224-49333", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname)); + assertEquals("test-rpc-server-address", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname)); + } + + @Test + public void testServerPortAssignment() throws Exception { + HiveConf conf = getHiveConf(); + conf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + SparkSessionManagerImpl testSessionManager = SparkSessionManagerImpl.getInstance(); + testSessionManager.setup(conf); + + assertTrue("Port should be within configured port range:" + SparkClientFactory.getServerPort(), + SparkClientFactory.getServerPort() >= 49152 && SparkClientFactory.getServerPort() <= 49333); + + //Verify that new spark session can be created to ensure that new SparkSession + // is successfully able to connect to the RpcServer with custom port. + try { + testSessionManager.getSession(null, conf, true); + } catch (HiveException e) { + Assert.fail("Failed test to connect to the RpcServer with custom port"); + } + + testSessionManager.shutdown(); + } private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } @@ -256,4 +292,12 @@ public void run() { } } } + + private HiveConf getHiveConf() { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionManagerImpl-local-dir").toString()); + return conf; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index fd9b72583a..180e35c190 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -21,6 +21,7 @@ import java.io.PrintStream; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; @@ -88,4 +89,9 @@ public static SparkClient createClient(Map sparkConf, HiveConf h return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } + @VisibleForTesting + public static int getServerPort() { + return server.getPort(); + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index a535b8d333..6f76c9e7a8 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,13 +46,14 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( - HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,