diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 44c3ccfbd2c..10e28ef66ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -329,91 +329,105 @@ public void testApplicationStorage() throws IOException { assertEquals(appProto1, apps.get(0)); } - @Test - public void testContainerStorage() throws IOException { - // test empty when no state - List recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertTrue(recoveredContainers.isEmpty()); - - // create a container request - ApplicationId appId = ApplicationId.newInstance(1234, 3); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 4); - ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); - Resource containerResource = Resource.newInstance(1024, 2); - StartContainerRequest containerReq = - createContainerRequest(containerId, containerResource); - - // store a container and verify recovered - long containerStartTime = System.currentTimeMillis(); - stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); - - // verify the container version key is not stored for new containers - DB db = stateStore.getDB(); - assertNull("version key present for new container", db.get(bytes( - stateStore.getContainerVersionKey(containerId.toString())))); + @Test + public void testContainerStorageWhenContainerIsRequested() + throws IOException { + final ContainerStateConstructParams containerParams = + storeContainerInStateStore(); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - RecoveredContainerState rcs = recoveredContainers.get(0); + final RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); - assertEquals(containerStartTime, rcs.getStartTime()); + assertEquals(containerParams.getContainerStartTime().longValue(), + rcs.getStartTime()); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); - assertEquals(containerResource, rcs.getCapability()); + assertEquals(containerParams.getContainerResource(), rcs.getCapability()); + } + - // store a new container record without StartContainerRequest - ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); - stateStore.storeContainerLaunched(containerId1); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - // check whether the new container record is discarded - assertEquals(1, recoveredContainers.size()); - // queue the container, and verify recovered + @Test + public void testContainerStorageWhenContainerIsQueued() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + Resource containerResource = containerParams.getContainerResource(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); assertEquals(containerResource, rcs.getCapability()); + } - // launch the container, add some diagnostics, and verify recovered - StringBuilder diags = new StringBuilder(); - stateStore.storeContainerLaunched(containerId); - diags.append("some diags for container"); - stateStore.storeContainerDiagnostics(containerId, diags); + @Test + public void testContainerStorageWhenContainerIsLaunched() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + Resource containerResource = containerParams.getContainerResource(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + + StringBuilder diags = launchContainerWithDiagnostics(containerId); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); assertEquals(containerResource, rcs.getCapability()); + } + + @Test + public void testContainerStorageWhenContainerIsPaused() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); - // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -425,82 +439,261 @@ public void testContainerStorage() throws IOException { recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); + } - // increase the container size, and verify recovered - ContainerTokenIdentifier updateTokenIdentifier = - new ContainerTokenIdentifier(containerId, "host", "user", - Resource.newInstance(2468, 4), 9876543210L, 42, 2468, - Priority.newInstance(7), 13579); + @Test + public void testContainerStorageWhenContainerSizeIncreased() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); - stateStore - .storeContainerUpdateToken(containerId, updateTokenIdentifier); + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + launchContainerWithDiagnostics(containerId); + + increaseContainerSize(containerId); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); assertEquals(Resource.newInstance(2468, 4), rcs.getCapability()); + } - // mark the container killed, add some more diags, and verify recovered - diags.append("some more diags for container"); - stateStore.storeContainerDiagnostics(containerId, diags); - stateStore.storeContainerKilled(containerId); + @Test + public void testContainerStorageWhenContainerMarkedAsKilled() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + StringBuilder diags = launchContainerWithDiagnostics(containerId); + ContainerTokenIdentifier updateTokenIdentifier = + increaseContainerSize(containerId); + + markContainerAsKilled(containerId, diags); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertTrue(rcs.getKilled()); ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils - .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken()); + .newContainerTokenIdentifier(rcs.getStartRequest() + .getContainerToken()); assertEquals(updateTokenIdentifier, tokenReadFromRequest); assertEquals(diags.toString(), rcs.getDiagnostics()); + } + + @Test + public void testContainerStorageWhenContainerCompleted() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + StringBuilder diags = launchContainerWithDiagnostics(containerId); + markContainerAsKilled(containerId, diags); - // add yet more diags, mark container completed, and verify recovered + // add yet more diags, mark container completed diags.append("some final diags"); stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); assertEquals(21, rcs.getExitCode()); assertTrue(rcs.getKilled()); assertEquals(diags.toString(), rcs.getDiagnostics()); + } - // store remainingRetryAttempts, workDir and logDir + @Test + public void testContainerStorage() throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + + // remaining retry attempts, work dirand logdir are stored stateStore.storeContainerRemainingRetryAttempts(containerId, 6); stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = + + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); - validateRetryAttempts(containerId); + } + + @Test + public void testContainerStorageWhenContainerRemoved() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // recover again to check remove clears all containers restartStateStore(); NMStateStoreService nmStoreSpy = spy(stateStore); loadContainersState(nmStoreSpy.getContainerStateIterator()); - verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class)); + verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class)); + } + + private ContainerStateConstructParams storeContainerInStateStore() + throws IOException { + // test empty when no state + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + Resource containerResource = Resource.newInstance(1024, 2); + StartContainerRequest containerReq = + createContainerRequest(containerId, containerResource); + + long containerStartTime = System.currentTimeMillis(); + stateStore.storeContainer(containerId, 0, containerStartTime, + containerReq); + + // verify the container version key is not stored for new containers + DB db = stateStore.getDB(); + assertNull("version key present for new container", db.get(bytes( + stateStore.getContainerVersionKey(containerId.toString())))); + + return new ContainerStateConstructParams() + .setContainerRequest(containerReq) + .setContainerResource(containerResource) + .setContainerStartTime(containerStartTime) + .setAppAttemptId(appAttemptId) + .setContainerId(containerId); + } + + private static class ContainerStateConstructParams { + private StartContainerRequest containerRequest; + private Resource containerResource; + private Long containerStartTime; + private ApplicationAttemptId appAttemptId; + private ContainerId containerId; + + public ApplicationAttemptId getAppAttemptId() { + return appAttemptId; + } + public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId + theAppAttemptId) { + this.appAttemptId = theAppAttemptId; + return this; + } + public ContainerId getContainerId() { + return containerId; + } + public ContainerStateConstructParams setContainerId(ContainerId + theContainerId) { + this.containerId = theContainerId; + return this; + } + + public StartContainerRequest getContainerRequest() { + return containerRequest; + } + public ContainerStateConstructParams setContainerRequest( + StartContainerRequest theContainerRequest) { + this.containerRequest = theContainerRequest; + return this; + } + + public Resource getContainerResource() { + return containerResource; + } + + public ContainerStateConstructParams setContainerResource( + Resource theContainerResource) { + this.containerResource = theContainerResource; + return this; + } + + public Long getContainerStartTime() { + return containerStartTime; + } + + public ContainerStateConstructParams setContainerStartTime( + Long theContainerStartTime) { + this.containerStartTime = theContainerStartTime; + return this; + } + } + + private void markContainerAsKilled(ContainerId containerId, + StringBuilder diags) throws IOException { + // mark the container killed, add some more diags + diags.append("some more diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerKilled(containerId); + } + + private ContainerTokenIdentifier increaseContainerSize( + ContainerId containerId) throws IOException { + ContainerTokenIdentifier updateTokenIdentifier = + new ContainerTokenIdentifier(containerId, "host", "user", + Resource.newInstance(2468, 4), 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + stateStore + .storeContainerUpdateToken(containerId, updateTokenIdentifier); + return updateTokenIdentifier; + } + + private StringBuilder launchContainerWithDiagnostics(ContainerId containerId) + throws IOException { + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + return diags; + } + + private void storeNewContainerRecordWithoutStartContainerRequest( + ApplicationAttemptId appAttemptId) throws IOException { + // store a new container record without StartContainerRequest + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); + stateStore.storeContainerLaunched(containerId1); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + // check whether the new container record is discarded + assertEquals(1, recoveredContainers.size()); } private void validateRetryAttempts(ContainerId containerId) @@ -524,11 +717,6 @@ private StartContainerRequest createContainerRequest( return createContainerRequestInternal(containerId, res); } - private StartContainerRequest createContainerRequest( - ContainerId containerId) { - return createContainerRequestInternal(containerId, null); - } - private StartContainerRequest createContainerRequestInternal(ContainerId containerId, Resource res) { LocalResource lrsrc = LocalResource.newInstance( @@ -545,9 +733,9 @@ private StartContainerRequest createContainerRequestInternal(ContainerId containerCmds.add("somearg"); Map serviceData = new HashMap(); serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3})); ByteBuffer containerTokens = - ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa}); Map acls = new HashMap(); acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); @@ -676,7 +864,8 @@ public void testLocalTrackerStateIterator() throws IOException { } @Test - public void testStartResourceLocalization() throws IOException { + public void testStartResourceLocalizationForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -730,10 +919,14 @@ public void testStartResourceLocalization() throws IOException { assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, startedResources.get(appRsrcProto)); + } - // start some public and private resources + @Test + public void testStartResourceLocalizationForPublicResources() + throws IOException { Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -750,23 +943,14 @@ public void testStartResourceLocalization() throws IOException { Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); stateStore.startResourceLocalization(null, null, pubRsrcProto2, pubRsrcLocalPath2); - Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( - URL.fromPath(privRsrcPath), - LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, - 789L, 680L, "*pattern*"); - LocalResourceProto privRsrcProto = rsrcPb.getProto(); - Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); - stateStore.startResourceLocalization(user, null, privRsrcProto, - privRsrcLocalPath); // restart and verify resources are marked in-progress restartStateStore(); - state = stateStore.loadLocalizationState(); - pubts = state.getPublicTrackerState(); - completedResources = loadCompletedResources( + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( pubts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( pubts.getStartedResourcesIterator()); assertTrue(completedResources.isEmpty()); assertEquals(2, startedResources.size()); @@ -774,34 +958,49 @@ public void testStartResourceLocalization() throws IOException { startedResources.get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, startedResources.get(pubRsrcProto2)); - userResources = loadUserResources(state.getIterator()); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(0, userResources.size()); + } + + @Test + public void testStartResourceLocalizationForPrivateResource() + throws IOException { + Path privRsrcPath = new Path("hdfs://some/private/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( + URL.fromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + String user = "somebody"; + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + + // restart and verify resources are marked in-progress + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + Map userResources = + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); - rur = userResources.get(user); - privts = rur.getPrivateTrackerState(); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - completedResources = loadCompletedResources( + List completedResources = loadCompletedResources( privts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( privts.getStartedResourcesIterator()); assertTrue(completedResources.isEmpty()); assertEquals(1, startedResources.size()); assertEquals(privRsrcLocalPath, startedResources.get(privRsrcProto)); - assertEquals(1, rur.getAppTrackerStates().size()); - appts = rur.getAppTrackerStates().get(appId); - assertNotNull(appts); - completedResources = loadCompletedResources( - appts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( - appts.getStartedResourcesIterator()); - assertTrue(completedResources.isEmpty()); - assertEquals(1, startedResources.size()); - assertEquals(appRsrcLocalPath, - startedResources.get(appRsrcProto)); + assertEquals(0, rur.getAppTrackerStates().size()); } @Test - public void testFinishResourceLocalization() throws IOException { + public void testFinishResourceLocalizationForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -862,10 +1061,14 @@ public void testFinishResourceLocalization() throws IOException { assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, completedResources.iterator().next()); + } - // start some public and private resources + @Test + public void testFinishResourceLocalizationForPublicResources() + throws IOException { Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -882,15 +1085,6 @@ public void testFinishResourceLocalization() throws IOException { Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); stateStore.startResourceLocalization(null, null, pubRsrcProto2, pubRsrcLocalPath2); - Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( - URL.fromPath(privRsrcPath), - LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, - 789L, 680L, "*pattern*"); - LocalResourceProto privRsrcProto = rsrcPb.getProto(); - Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); - stateStore.startResourceLocalization(user, null, privRsrcProto, - privRsrcLocalPath); // finish some of the resources LocalizedResourceProto pubLocalizedProto1 = @@ -900,6 +1094,43 @@ public void testFinishResourceLocalization() throws IOException { .setSize(pubRsrcProto1.getSize()) .build(); stateStore.finishResourceLocalization(null, null, pubLocalizedProto1); + + // restart and verify state + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + Map startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); + assertEquals(pubLocalizedProto1, + completedResources.iterator().next()); + assertEquals(1, startedResources.size()); + assertEquals(pubRsrcLocalPath2, + startedResources.get(pubRsrcProto2)); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(0, userResources.size()); + } + + @Test + public void testFinishResourceLocalizationForPrivateResource() + throws IOException { + String user = "somebody"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + + Path privRsrcPath = new Path("hdfs://some/private/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( + URL.fromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + LocalizedResourceProto privLocalizedProto = LocalizedResourceProto.newBuilder() .setResource(privRsrcProto) @@ -910,22 +1141,19 @@ public void testFinishResourceLocalization() throws IOException { // restart and verify state restartStateStore(); - state = stateStore.loadLocalizationState(); - pubts = state.getPublicTrackerState(); - completedResources = loadCompletedResources( + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( pubts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( pubts.getStartedResourcesIterator()); - assertEquals(1, completedResources.size()); - assertEquals(pubLocalizedProto1, - completedResources.iterator().next()); - assertEquals(1, startedResources.size()); - assertEquals(pubRsrcLocalPath2, - startedResources.get(pubRsrcProto2)); - userResources = loadUserResources(state.getIterator()); + assertEquals(0, completedResources.size()); + assertEquals(0, startedResources.size()); + Map userResources = + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); - rur = userResources.get(user); - privts = rur.getPrivateTrackerState(); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); completedResources = loadCompletedResources( privts.getCompletedResourcesIterator()); @@ -935,21 +1163,16 @@ public void testFinishResourceLocalization() throws IOException { assertEquals(privLocalizedProto, completedResources.iterator().next()); assertTrue(startedResources.isEmpty()); - assertEquals(1, rur.getAppTrackerStates().size()); - appts = rur.getAppTrackerStates().get(appId); - assertNotNull(appts); - completedResources = loadCompletedResources( - appts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( - appts.getStartedResourcesIterator()); + assertEquals(0, rur.getAppTrackerStates().size()); + LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); + assertNull(appts); assertTrue(startedResources.isEmpty()); assertEquals(1, completedResources.size()); - assertEquals(appLocalizedProto, - completedResources.iterator().next()); } @Test - public void testRemoveLocalizedResource() throws IOException { + public void testRemoveLocalizedResourceForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -983,10 +1206,15 @@ public void testRemoveLocalizedResource() throws IOException { restartStateStore(); verifyEmptyState(); + } - // add public and private resources and remove some + @Test + public void testRemoveLocalizedResourceForPublicResources() + throws IOException { + // add public resources and remove some Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -1018,8 +1246,32 @@ public void testRemoveLocalizedResource() throws IOException { .build(); stateStore.finishResourceLocalization(null, null, pubLocalizedProto2); stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2); + + // restart and verify state + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = + loadCompletedResources(pubts.getCompletedResourcesIterator()); + Map startedResources = + loadStartedResources(pubts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(pubLocalizedProto1, + completedResources.iterator().next()); + Map userResources = + loadUserResources(state.getIterator()); + assertTrue(userResources.isEmpty()); + } + + @Test + public void testRemoveLocalizedResourceForPrivateResource() + throws IOException { + String user = "somebody"; + Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(privRsrcPath), LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, 789L, 680L, "*pattern*"); @@ -1038,9 +1290,7 @@ public void testRemoveLocalizedResource() throws IOException { Map startedResources = loadStartedResources(pubts.getStartedResourcesIterator()); assertTrue(startedResources.isEmpty()); - assertEquals(1, completedResources.size()); - assertEquals(pubLocalizedProto1, - completedResources.iterator().next()); + assertEquals(0, completedResources.size()); Map userResources = loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); @@ -1574,9 +1824,9 @@ private StartContainerRequest storeMockContainer(ContainerId containerId) containerCmds.add("somearg"); Map serviceData = new HashMap(); serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3})); ByteBuffer containerTokens = ByteBuffer - .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + .wrap(new byte[] {0x7, 0x8, 0x9, 0xa}); Map acls = new HashMap(); acls.put(ApplicationAccessType.VIEW_APP, "viewuser");