diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index f0ab324ace8..11f5b464769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -257,6 +259,9 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { + if (isAppStateFinal(appState)) { + pruneAppState(appState); + } store.updateApplicationStateInternal(appId, appState); if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { store.notifyApplication(new RMAppEvent(appId, @@ -276,7 +281,30 @@ public RMStateStoreState transition(RMStateStore store, } } return finalState(isFenced); - }; + } + + private boolean isAppStateFinal(ApplicationStateData appState) { + RMAppState state = appState.getState(); + return state == RMAppState.FINISHED || state == RMAppState.FAILED || + state == RMAppState.KILLED; + } + + private void pruneAppState(ApplicationStateData appState) { + ApplicationSubmissionContext srcCtx = + appState.getApplicationSubmissionContext(); + ApplicationSubmissionContextPBImpl context = + new ApplicationSubmissionContextPBImpl(); + // most fields in the ApplicationSubmissionContex are not needed, + // but the following few need to be present for recovery to succeed + context.setApplicationId(srcCtx.getApplicationId()); + context.setResource(srcCtx.getResource()); + context.setQueue(srcCtx.getQueue()); + context.setAMContainerResourceRequests( + srcCtx.getAMContainerResourceRequests()); + context.setApplicationType(srcCtx.getApplicationType()); + context.setAMContainerSpec(context.getAMContainerSpec()); + appState.setApplicationSubmissionContext(context); + } } private static class RemoveAppTransition implements diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6a8f47d085c..74af8a9947f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -35,7 +35,9 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -83,6 +86,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -1488,4 +1492,64 @@ public void testDelegationTokenNodeWithSplitChangeAcrossRestarts() tokensWithIndex, sequenceNumber, 3); store.close(); } + + @Test + public void testAppSubmissionContextIsPrunedInFinalApplicationState() + throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + ApplicationId appId = ApplicationId.fromString("application_1234_0010"); + + Configuration conf = createConfForDelegationTokenNodeSplit(1); + RMStateStore store = zkTester.getRMStateStore(conf); + ApplicationSubmissionContext ctx = + new ApplicationSubmissionContextPBImpl(); + ctx.setApplicationId(appId); + ctx.setQueue("a_queue"); + ContainerLaunchContextPBImpl containerLaunchCtx = + new ContainerLaunchContextPBImpl(); + containerLaunchCtx.setCommands(Collections.singletonList("a_command")); + ctx.setAMContainerSpec(containerLaunchCtx); + Resource resource = new ResourcePBImpl(); + resource.setMemorySize(17L); + ctx.setResource(resource); + Map schedulingPropertiesMap = + Collections.singletonMap("a_key", "a_value"); + ctx.setApplicationSchedulingPropertiesMap(schedulingPropertiesMap); + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(); + appState.setState(RMAppState.RUNNING); + appState.setApplicationSubmissionContext(ctx); + store.storeApplicationStateInternal(appId, appState); + + RMState rmState = store.loadState(); + assertEquals(1, rmState.getApplicationState().size()); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + appState.setState(RMAppState.RUNNING); + store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null)); + + rmState = store.loadState(); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + assertEquals("ApplicationSchedulingPropertiesMap should not have been pruned" + + " from the application submission context before the FINISHED state", + schedulingPropertiesMap, ctx.getApplicationSchedulingPropertiesMap()); + + appState.setState(RMAppState.FINISHED); + store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null)); + + rmState = store.loadState(); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + assertEquals(appId, ctx.getApplicationId()); + assertEquals("a_queue", ctx.getQueue()); + assertNotNull(ctx.getAMContainerSpec()); + assertEquals(17L, ctx.getResource().getMemorySize()); + assertEquals("ApplicationSchedulingPropertiesMap should have been pruned" + + " from the application submission context when in FINISHED STATE", + Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap()); + store.close(); + } }