diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 32a06c4..fc4537c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -628,7 +628,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { notifyDoneUpdatingApplication(appId, storedException); } } catch (Exception e) { - LOG.error("Error storing app: " + appId, e); + LOG.error("Error storing/updating app: " + appId, e); notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) @@ -679,7 +679,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { } } catch (Exception e) { LOG.error( - "Error storing appAttempt: " + attemptState.getAttemptId(), e); + "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index fdfd6cd..07ce21f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -566,7 +566,15 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, + nodeUpdatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeUpdatePath, appStateData, 0); + + if (zkClient.exists(nodeUpdatePath, true) != null) { + setDataWithRetries(nodeUpdatePath, appStateData, -1); + } else { + createWithRetries(nodeUpdatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); + LOG.info(appId + " znode didn't exist. Created a new znode to" + + " update the application state."); + } } @Override @@ -601,7 +609,15 @@ public synchronized void updateApplicationAttemptStateInternal( + " at: " + nodeUpdatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeUpdatePath, attemptStateData, 0); + + if (zkClient.exists(nodeUpdatePath, true) != null) { + setDataWithRetries(nodeUpdatePath, attemptStateData, -1); + } else { + createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, + CreateMode.PERSISTENT); + LOG.info(appAttemptId + " znode didn't exist. Created a new znode to" + + " update the application attempt state."); + } } @Override @@ -961,6 +977,7 @@ T runWithRetries() throws Exception { Thread.sleep(zkRetryInterval); continue; } + LOG.info("Error while doing ZK operaion.", ke); throw ke; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 009e96e..507e164 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -310,6 +310,30 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED); store.updateApplicationAttemptState(newAttemptState); + + // test updating the state of an app/attempt whose initial state was not + // saved. + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationSubmissionContext dummyContext = + new ApplicationSubmissionContextPBImpl(); + dummyContext.setApplicationId(dummyAppId); + ApplicationState dummyApp = + new ApplicationState(appState.submitTime, appState.startTime, + dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics", + 1234); + store.updateApplicationState(dummyApp); + + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + ApplicationAttemptState dummyAttempt = + new ApplicationAttemptState(dummyAttemptId, + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptCredentials(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED); + store.updateApplicationAttemptState(dummyAttempt); + // let things settle down Thread.sleep(1000); store.close(); @@ -320,6 +344,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) RMState newRMState = store.loadState(); Map newRMAppState = newRMState.getApplicationState(); + assertNotNull(newRMAppState.get(dummyApp.getAppId())); ApplicationState updatedAppState = newRMAppState.get(appId1); assertEquals(appState.getAppId(),updatedAppState.getAppId()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); @@ -331,6 +356,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state + assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt( + dummyAttemptId)); ApplicationAttemptState updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(),