commit 28cb68414f0fc9a8d8aa278d2d02c725971f32d2 Author: Bharath Krishna Date: Thu Jul 26 16:35:44 2018 -0700 HIVE-19814 : Fix issue that RPC Server port is always random for spark diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5ed5d4214e..6a1ffdf9f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -197,7 +198,12 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se LOG.debug(String.format( "Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } - + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { + String value = RpcConfiguration.getValue(hiveConf, propertyName); + sparkConf.put(propertyName, value); + LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, + LogUtils.maskIfPassword(propertyName, value))); + } } final boolean optShuffleSerDe = hiveConf.getBoolVar( diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 6964764ff2..e19a6f5797 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -27,6 +27,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -185,6 +186,18 @@ public void testGetHiveException() throws Exception { "java.lang.NoClassDefFoundError: org/apache/spark/SparkConf"); } + @Test + public void testConfigsForInitialization() { + //Test to make sure that configs listed in RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed + // through HiveConf are included in the Spark configuration. + HiveConf hiveConf = getHiveConf(); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, "test-rpc-server-address"); + Map sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); + assertEquals("49152-49222,49223,49224-49333", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname)); + assertEquals("test-rpc-server-address", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname)); + } + private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index bd3a7a7321..eb824efdfb 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,14 +46,15 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( - HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,