diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index dd03d64..dbea253 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -65,7 +65,7 @@ private volatile boolean keepRunning; private volatile float progress; - private volatile Exception savedException; + private volatile Throwable savedException; public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { this(new AMRMClientImpl(), intervalMs, callbackHandler); @@ -222,18 +222,12 @@ public void run() { try { response = client.allocate(progress); - } catch (YarnException ex) { - LOG.error("Yarn exception on heartbeat", ex); + } catch (Throwable ex) { + LOG.error("Exception on heartbeat", ex); savedException = ex; // interrupt handler thread in case it waiting on the queue handlerThread.interrupt(); return; - } catch (IOException e) { - LOG.error("IO exception on heartbeat", e); - savedException = e; - // interrupt handler thread in case it waiting on the queue - handlerThread.interrupt(); - return; } } if (response != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index a7035b9..99e896e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -277,6 +277,8 @@ protected void populateNMTokens(AllocateResponse allocateResponse) { public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException { + Preconditions.checkArgument(appStatus != null, + "AppStatus should not be null."); FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(appStatus, appMessage, appTrackingUrl); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 0d96185..d61e172 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -342,6 +342,40 @@ public void testCallBackThrowOutExceptionNoStop() throws YarnException, runCallBackThrowOutException(callbackHandler); } + @Test(timeout=10000) + public void testAMRMClientAsyncRunTimeException() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + String exStr = "TestRunTimeException"; + RuntimeException mockRunTimeException = mock(RuntimeException.class); + when(mockRunTimeException.getMessage()).thenReturn(exStr); + when(client.allocate(anyFloat())).thenThrow(mockRunTimeException); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.savedException == null) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr)); + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) {