Description
For YarnClient mode, when stopping YarnClientSchedulerBackend, it first tries to interrupt Yarn application monitor thread. In MonitorThread.run() it catches InterruptedException to gracefully response to stopping request.
But client.monitorApplication method also throws InterruptedIOException when the hadoop rpc call is calling. In this case, MonitorThread will not know it is interrupted, a Yarn App failed is returned with "Failed to contact YARN for application xxxxx; YARN application has exited unexpectedly with state xxxxx" is logged with error level. which confuse user a lot.
We Should take considerate InterruptedIOException here to make it the same behavior with InterruptedException.
private class MonitorThread extends Thread { private var allowInterrupt = true override def run() { try { val YarnAppReport(_, state, diags) = client.monitorApplication(appId.get, logApplicationReport = false) logError(s"YARN application has exited unexpectedly with state $state! " + "Check the YARN application logs for more details.") diags.foreach { err => logError(s"Diagnostics message: $err") } allowInterrupt = false sc.stop() } catch { case e: InterruptedException => logInfo("Interrupting monitor thread") } }
// wrong error message 2020-12-05 03:06:58,000 ERROR [YARN application state monitor]: org.apache.spark.deploy.yarn.Client(91) - Failed to contact YARN for application application_1605868815011_1154961. java.io.InterruptedIOException: Call interrupted at org.apache.hadoop.ipc.Client.call(Client.java:1466) at org.apache.hadoop.ipc.Client.call(Client.java:1409) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy38.getApplicationReport(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getApplicationReport(ApplicationClientProtocolPBClientImpl.java:187) at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy39.getApplicationReport(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getApplicationReport(YarnClientImpl.java:408) at org.apache.spark.deploy.yarn.Client.getApplicationReport(Client.scala:327) at org.apache.spark.deploy.yarn.Client.monitorApplication(Client.scala:1039) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:116) 2020-12-05 03:06:58,000 ERROR [YARN application state monitor]: org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend(70) - YARN application has exited unexpectedly with state FAILED! Check the YARN application logs for more details. 2020-12-05 03:06:58,001 ERROR [YARN application state monitor]: org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend(70) - Diagnostics message: Failed to contact YARN for application application_1605868815011_1154961.
// hadoop ipc code public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { final Call call = createCall(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new InterruptedIOException("Call interrupted"); } }