diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 71e432d..1bcd221 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -180,7 +180,7 @@ void cancel(String jobId) { protocol.cancel(jobId); } - private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret) + private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { Runnable runnable; final String serverAddress = rpcServer.getAddress(); @@ -424,6 +424,7 @@ public void run() { try { int exitCode = child.waitFor(); if (exitCode != 0) { + rpcServer.cancelClient(clientId, "Child process exited before connecting back"); LOG.warn("Child process exited with code {}.", exitCode); } } catch (InterruptedException ie) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 32d4c46..d9cb81b 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -164,6 +164,25 @@ public void operationComplete(Promise p) { } /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + if (!pendingClients.containsKey(clientId)) { + // Nothing to be done here. The connection has already been established. + return; + } + final ClientInfo cinfo = pendingClients.remove(clientId); + if (cinfo != null) { + cinfo.timeoutFuture.cancel(true); + if (!cinfo.promise.isDone()) { + cinfo.promise.setFailure(new RuntimeException("Cancel client '%s'. Error: " + msg)); + } + } + } + + /** * Creates a secret for identifying a client connection. */ public String createSecret() {