diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ccdfca6..7c3045e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3076,6 +3076,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + "hive.server2.thrift.bind.host is to be used."), + SPARK_RPC_SERVER_PORT("hive.spark.client.rpc.server.port", "", "A list of port ranges which can be used by RPC server " + + "with the format of 49152-49222,49228 and a random one is selected from the list. Default is empty, which randomly " + + "selects one port from all available ones."), SPARK_DYNAMIC_PARTITION_PRUNING( "hive.spark.dynamic.partition.pruning", false, "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" + diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index e387659..ad28210 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -18,8 +18,10 @@ package org.apache.hive.spark.client.rpc; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import javax.security.sasl.Sasl; @@ -107,6 +109,41 @@ String getServerAddress() throws IOException { return ServerUtils.getHostAddress(hiveHost).getHostName(); } + /** + * Parses the port string like 49152-49222,49228 into the port list and randomly picks one + * @return a configured port. 0 if configured port range is not in the correct format. + */ + int getServerPort() { + int inetPort = 0; + String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname); + ArrayList ports = new ArrayList(); + try { + if(!StringUtils.isEmpty(portString)) { + for (String portRange : portString.split(",")) { + String[] range = portRange.split("-"); + if (range.length == 0 || range.length > 2 + || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) { + LOG.warn("Incorrect RPC server port configuration in HS2 server"); + return inetPort; + } + if (range.length == 1) { + ports.add(Integer.valueOf(range[0])); + } else { + for (int i = Integer.valueOf(range[0]); i <= Integer.valueOf(range[1]); i++) { + ports.add(i); + } + } + } + + inetPort = ports.get(new Random().nextInt(ports.size())); + } + } catch(NumberFormatException e) { + LOG.warn("Incorrect RPC server port configuration in HS2 server"); + } + + return inetPort; + } + String getRpcChannelLogLevel() { return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 68ee627..859668b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -108,7 +108,7 @@ public void run() { .option(ChannelOption.SO_BACKLOG, 1) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) - .bind(0) + .bind(config.getServerPort()) .sync() .channel(); this.port = ((InetSocketAddress) channel.localAddress()).getPort(); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index d7969c9..bc1ab64 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -19,25 +19,27 @@ import java.io.Closeable; import java.util.Collection; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; + import javax.security.sasl.SaslException; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -141,6 +143,27 @@ public void testBadHello() throws Exception { } @Test + public void testServerPort() throws Exception { + Map config = new HashMap(); + + RpcServer server0 = autoClose(new RpcServer(config)); + assertTrue("Empty port range should still return a random valid port", server0.getPort() >= 49152); + + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49152-49222,49223,49224-49333"); + RpcServer server1 = autoClose(new RpcServer(config)); + assertTrue("Port should be within configured port range", server1.getPort() >= 49152 && server1.getPort() <= 49333); + + int expectedPort = 65535; + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, String.valueOf(expectedPort)); + RpcServer server2 = autoClose(new RpcServer(config)); + assertTrue("Port should match configured one", server2.getPort() == expectedPort); + + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49552-49222,49223,49224-49333"); + RpcServer server3 = autoClose(new RpcServer(config)); + assertTrue("Invalid port range should still return a random valid port", server3.getPort() >= 49152); + } + + @Test public void testCloseListener() throws Exception { RpcServer server = autoClose(new RpcServer(emptyConfig)); Rpc[] rpcs = createRpcConnection(server);