diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 068c962..b420b0a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1982,12 +1982,24 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), - SPARK_CLIENT_FUTURE_TIMEOUT( - "hive.spark.client.future.timeout", - "60s", - new TimeValidator(TimeUnit.SECONDS), - "remote spark client JobHandle future timeout value in seconds.") - ; + SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", + "60s", new TimeValidator(TimeUnit.SECONDS), + "Timeout for requests from client to remote spark driver."), + SPARK_RPC_CLIENT_CONNECT_TIMEOUT("hive.spark.client.connect.timeout", + "1000ms", new TimeValidator(TimeUnit.MILLISECONDS), + "Timeout for remote spark driver in connecting back to client."), + SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT("hive.spark.client.server.connect.timeout", + "20000ms", new TimeValidator(TimeUnit.MILLISECONDS), + "Timeout for handshake between client and remote spark driver. Checked by both processes."), + SPARK_RPC_SECRET_RANDOM_BITS("hive.spark.client.secret.bits", "256", + "Number of bits of randomness in the generated secret for communication between client and remote spark driver. " + + "Rounded down to the nearest multiple of 8."), + SPARK_RPC_MAX_THREADS("hive.spark.client.rpc.threads", 8, + "Maximum number of threads for remote spark driver's RPC event loop."), + SPARK_RPC_MAX_MESSAGE_SIZE("hive.spark.client.rpc.max.size", 50 * 1024 * 1024, + "Maximum message size in bytes for communication between client and remote spark driver. Default is 50MB."), + SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, + "Channel logging level for remote spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}"); public final String varname; private final String defaultExpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 334c191..22034c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -21,14 +21,18 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.compress.utils.CharsetNames; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -105,11 +109,30 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) "load spark configuration from hive configuration (%s -> %s).", propertyName, value)); } + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { + String value = null; + if (RpcConfiguration.SPARK_TIME_CONFIGS.contains(propertyName)) { + value = getMilliSecondValue(hiveConf, HiveConf.getConfVars(propertyName)); + } else { + value = hiveConf.get(propertyName); + } + //SparkConf only accepts properties starting with 'spark'. + //Hence all our RSC configs need to be prepended with this (see RpcConfiguration.java) + sparkConf.put("spark." + propertyName, value); + LOG.info(String.format( + "load spark configuration from hive configuration (%s -> %s).", + propertyName, value)); + } + } return sparkConf; } + private static String getMilliSecondValue(HiveConf conf, HiveConf.ConfVars var) { + return String.valueOf(conf.getTimeVar(var, TimeUnit.MILLISECONDS)); + } + static SparkConf generateSparkConf(Map conf) { SparkConf sparkConf = new SparkConf(false); for (Map.Entry entry : conf.entrySet()) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 865e03e..9a63e31 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hive.spark.client.rpc.Rpc; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer; import org.apache.spark.SparkContext; import org.apache.spark.SparkException; @@ -299,7 +300,6 @@ public void run() { argv.add("org.apache.spark.deploy.SparkSubmit"); } - argv.add("--properties-file"); argv.add(properties.getAbsolutePath()); argv.add("--class"); diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index ac71ae9..fffe24b 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -84,7 +84,7 @@ final String secret, final RpcDispatcher dispatcher) throws Exception { final RpcConfiguration rpcConf = new RpcConfiguration(config); - int connectTimeoutMs = rpcConf.getConnectTimeoutMs(); + int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs(); final ChannelFuture cf = new Bootstrap() .group(eloop) diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 5a826ba..b78f476 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -21,9 +21,13 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; +import java.util.Arrays; import java.util.Enumeration; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,63 +41,49 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); - /** Connection timeout for RPC clients. */ - public static final String CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.connect.timeout.ms"; - private static final int CONNECT_TIMEOUT_MS_DEFAULT = 1000; - - /** - * How long the server should wait for clients to connect back after they're - * registered. Also used to time out the client waiting for the server to - * reply to its "hello" message. - */ - public static final String SERVER_CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.server.connect.timeout.ms"; - private static final long SERVER_CONNECT_TIMEOUT_MS_DEFAULT = 10000L; - - /** - * Number of bits of randomness in the generated client secrets. Rounded down - * to the nearest multiple of 8. - */ - public static final String SECRET_RANDOM_BITS_KEY = "hive.spark.client.secret.bits"; - private static final int SECRET_RANDOM_BITS_DEFAULT = 256; - - /** Hostname or IP address to advertise for the server. */ - public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address"; - - /** Maximum number of threads to use for the RPC event loop. */ - public static final String RPC_MAX_THREADS_KEY = "hive.spark.client.rpc.threads"; - public static final int RPC_MAX_THREADS_DEFAULT = 8; - - /** Maximum message size. Default = 10MB. */ - public static final String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size"; - public static final int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024; + public static final List HIVE_SPARK_RSC_CONFIGS = Arrays.asList(new String[]{ + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname + }); + public static final List SPARK_TIME_CONFIGS = Arrays.asList(new String[] { + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname + }); - /** Channel logging level. */ - public static final String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level"; + public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address"; private final Map config; + private static final HiveConf DEFAULT_CONF = new HiveConf(); + public RpcConfiguration(Map config) { this.config = config; } - int getConnectTimeoutMs() { - String value = config.get(CONNECT_TIMEOUT_MS_KEY); - return value != null ? Integer.parseInt(value) : CONNECT_TIMEOUT_MS_DEFAULT; + long getConnectTimeoutMs() { + String value = config.get("spark." + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname); + return value != null ? Integer.parseInt(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS); } int getMaxMessageSize() { - String value = config.get(RPC_MAX_MESSAGE_SIZE_KEY); - return value != null ? Integer.parseInt(value) : RPC_MAX_MESSAGE_SIZE_DEFAULT; + String value = config.get("spark." + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.defaultIntVal; } long getServerConnectTimeoutMs() { - String value = config.get(SERVER_CONNECT_TIMEOUT_MS_KEY); - return value != null ? Long.parseLong(value) : SERVER_CONNECT_TIMEOUT_MS_DEFAULT; + String value = config.get("spark." + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname); + return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS); } int getSecretBits() { - String value = config.get(SECRET_RANDOM_BITS_KEY); - return value != null ? Integer.parseInt(value) : SECRET_RANDOM_BITS_DEFAULT; + String value = config.get("spark." + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal; } String getServerAddress() throws IOException { @@ -133,12 +123,12 @@ String getServerAddress() throws IOException { } String getRpcChannelLogLevel() { - return config.get(RPC_CHANNEL_LOG_LEVEL_KEY); + return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname); } public int getRpcThreadCount() { - String value = config.get(RPC_MAX_THREADS_KEY); - return value != null ? Integer.parseInt(value) : RPC_MAX_THREADS_DEFAULT; + String value = config.get("spark." + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.defaultIntVal; } } diff --git spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index a2dd3e6..2d64740 100644 --- spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -32,6 +32,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ private Collection closeables; private Map emptyConfig = - ImmutableMap.of(RpcConfiguration.RPC_CHANNEL_LOG_LEVEL_KEY, "DEBUG"); + ImmutableMap.of("spark." + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, "DEBUG"); @Before public void setUp() {