diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 71e432d..7fb4bc7 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -180,7 +180,7 @@ void cancel(String jobId) { protocol.cancel(jobId); } - private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret) + private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { Runnable runnable; final String serverAddress = rpcServer.getAddress(); @@ -424,6 +424,7 @@ public void run() { try { int exitCode = child.waitFor(); if (exitCode != 0) { + rpcServer.cancelClient(clientId); LOG.warn("Child process exited with code {}.", exitCode); } } catch (InterruptedException ie) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 32d4c46..91d6b27 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -164,6 +164,24 @@ public void operationComplete(Promise p) { } /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @throws java.lang.IllegalStateException if the client is not found in the pending client map + */ + public void cancelClient(final String clientId) { + if (!pendingClients.containsKey(clientId)) { + throw new IllegalStateException( + String.format("Client '%s' is not registered.", clientId)); + } + final ClientInfo cinfo = pendingClients.remove(clientId); + cinfo.timeoutFuture.cancel(true); + if (!cinfo.promise.isDone()) { + cinfo.promise.setFailure(new RuntimeException( + String.format("Remote process failed - cancel pending client '%s'.", clientId))); + } + } + + /** * Creates a secret for identifying a client connection. */ public String createSecret() {