diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index cf60b13..a767bbc 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -43,7 +43,7 @@ /** Used by client and driver to share a secret for establishing an RPC session. */ static final String CONF_KEY_SECRET = "spark.client.authentication.secret"; - private static RpcServer server = null; + private static volatile RpcServer server = null; private static final Object stopLock = new Object(); /** @@ -79,7 +79,7 @@ public static void stop() { * @param sparkConf Configuration for the remote Spark application, contains spark.* properties. * @param hiveConf Configuration for Hive, contains hive.* properties. */ - public static synchronized SparkClient createClient(Map sparkConf, HiveConf hiveConf) + public static SparkClient createClient(Map sparkConf, HiveConf hiveConf) throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); return new SparkClientImpl(server, sparkConf, hiveConf); diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 08fb535..f116443 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -163,7 +163,7 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap) } @VisibleForTesting - Future registerClient(final String clientId, String secret, + synchronized Future registerClient(final String clientId, String secret, RpcDispatcher serverDispatcher, long clientTimeoutMs) { final Promise promise = group.next().newPromise(); @@ -230,16 +230,16 @@ public String createSecret() { return sb.toString(); } - public String getAddress() { + public synchronized String getAddress() { return address; } - public int getPort() { + public synchronized int getPort() { return port; } @Override - public void close() { + public synchronized void close() { try { channel.close(); for (ClientInfo client : pendingClients.values()) {