commit 1fa2d1d974aecdeb0b5aa5bb5cd20b1f770a4c33 Author: Sahil Takiar Date: Thu Oct 26 11:03:23 2017 -0700 HIVE-17929: Use sessionId for HoS Remote Driver Client id diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index bdb0798442..f9e93284ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -73,7 +73,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se // With local spark context, all user sessions share the same spark context. return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf); } else { - return new RemoteHiveSparkClient(hiveconf, sparkConf); + return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 9b38c1a1f1..d9d4b372b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -88,18 +88,20 @@ private transient List localFiles = new ArrayList(); private final transient long sparkClientTimtout; + private final String sessionId; - RemoteHiveSparkClient(HiveConf hiveConf, Map conf) throws Exception { + RemoteHiveSparkClient(HiveConf hiveConf, Map conf, String sessionId) throws Exception { this.hiveConf = hiveConf; sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); sparkConf = HiveSparkClientFactory.generateSparkConf(conf); this.conf = conf; + this.sessionId = sessionId; createRemoteClient(); } private void createRemoteClient() throws Exception { - remoteClient = SparkClientFactory.createClient(conf, hiveConf); + remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) || diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 8cedd30e1b..50c7bb20c4 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -83,10 +83,10 @@ public static void stop() { * @param sparkConf Configuration for the remote Spark application, contains spark.* properties. * @param hiveConf Configuration for Hive, contains hive.* properties. */ - public static SparkClient createClient(Map sparkConf, HiveConf hiveConf) + public static SparkClient createClient(Map sparkConf, HiveConf hiveConf, String sessionId) throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, sparkConf, hiveConf); + return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index f6a23dc600..49b7deb5ee 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -90,19 +90,19 @@ private final ClientProtocol protocol; private volatile boolean isAlive; - SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf) throws IOException, SparkException { + SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf, + String sessionid) throws IOException, SparkException { this.conf = conf; this.hiveConf = hiveConf; this.jobs = Maps.newConcurrentMap(); - String clientId = UUID.randomUUID().toString(); String secret = rpcServer.createSecret(); - this.driverThread = startDriver(rpcServer, clientId, secret); + this.driverThread = startDriver(rpcServer, sessionid, secret); this.protocol = new ClientProtocol(); try { // The RPC server will take care of timeouts here. - this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get(); + this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get(); } catch (Throwable e) { String errorMsg = null; if (e.getCause() instanceof TimeoutException) { diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index a27eb2b116..697d8d144d 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -313,7 +314,7 @@ private void runTest(boolean local, TestFunction test) throws Exception { SparkClient client = null; try { test.config(conf); - client = SparkClientFactory.createClient(conf, HIVECONF); + client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString()); test.call(client); } finally { if (client != null) {