From 961dc7f7c1587e1a62ddae23c004ad8754a03b91 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 8 Apr 2019 17:47:46 +0530 Subject: [PATCH] YARN-9452 --- .../timelineservice/NMTimelinePublisher.java | 12 -- .../security/TestTimelineAuthFilterForV2.java | 172 +++++++++++---------- 2 files changed, 91 insertions(+), 93 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index ba57495..5a4de1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -273,11 +273,7 @@ private void publishContainerResumedEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } @@ -302,11 +298,7 @@ private void publishContainerPausedEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } @@ -333,11 +325,7 @@ private void publishContainerKilledEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index 95a008a..4808e98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -34,11 +34,13 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileFilter; import java.io.FileReader; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Callable; import org.apache.commons.io.FileUtils; @@ -100,6 +102,8 @@ getKeytabFile()); private static String httpSpnegoPrincipal = KerberosTestUtils. getServerPrincipal(); + private static final String entityType = "dummy_type"; + private static final AtomicInteger uniqueId = new AtomicInteger(0); // First param indicates whether HTTPS access or HTTP access and second param // indicates whether it is kerberos access or token based access. @@ -274,11 +278,20 @@ private static TimelineEntity createEntity(String id, String type) { } private static void verifyEntity(File entityTypeDir, String id, String type) - throws IOException { + throws InterruptedException, IOException { File entityFile = new File(entityTypeDir, id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + TimelineEntity entity = null; + for (int i = 0; i < 50; i++) { + if (entityFile.exists()) { + entity = readEntityFile(entityFile); + if (entity != null) { + break; + } + } + Thread.sleep(50); + } assertTrue(entityFile.exists()); - TimelineEntity entity = readEntityFile(entityFile); assertNotNull(entity); assertEquals(id, entity.getId()); assertEquals(type, entity.getType()); @@ -333,7 +346,7 @@ private boolean publishWithRetries(ApplicationId appId, File entityTypeDir, @Test public void testPutTimelineEntities() throws Exception { - final String entityType = "dummy_type"; + final String entityType = "dummy_type" + uniqueId.getAndIncrement(); ApplicationId appId = ApplicationId.newInstance(0, 1); File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() + File.separator + "entities" + File.separator + @@ -342,92 +355,89 @@ public void testPutTimelineEntities() throws Exception { File.separator + "test_flow_name" + File.separator + "test_flow_version" + File.separator + "1" + File.separator + appId.toString() + File.separator + entityType); - try { - if (withKerberosLogin) { - KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { - @Override - public Void call() throws Exception { - publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); - return null; - } - }); - } else { - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 1)); - - AppLevelTimelineCollector collector = - (AppLevelTimelineCollector) collectorManager.get(appId); - Token token = - collector.getDelegationTokenForApp(); - assertNotNull(token); + if (withKerberosLogin) { + KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { + @Override + public Void call() throws Exception { + publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); + return null; + } + }); + } else { + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 1)); - // Verify if token is renewed automatically and entities can still be - // published. - Thread.sleep(1000); - // Entities should publish successfully after renewal. - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 2)); - assertNotNull(collector); - verify(collectorManager.getTokenManagerService(), atLeastOnce()). - renewToken(eq(collector.getDelegationTokenForApp()), - any(String.class)); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector) collectorManager.get(appId); + Token token = + collector.getDelegationTokenForApp(); + assertNotNull(token); - // Wait to ensure lifetime of token expires and ensure its regenerated - // automatically. - Thread.sleep(3000); - for (int i = 0; i < 40; i++) { - if (!token.equals(collector.getDelegationTokenForApp())) { - break; - } - Thread.sleep(50); - } - assertNotEquals("Token should have been regenerated.", token, - collector.getDelegationTokenForApp()); - Thread.sleep(1000); - // Try publishing with the old token in UGI. Publishing should fail due - // to invalid token. - try { - publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); - fail("Exception should have been thrown due to Invalid Token."); - } catch (YarnException e) { - assertTrue("Exception thrown should have been due to Invalid Token.", - e.getCause().getMessage().contains("InvalidToken")); - } + // Verify if token is renewed automatically and entities can still be + // published. + Thread.sleep(1000); + // Entities should publish successfully after renewal. + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); + assertNotNull(collector); + verify(collectorManager.getTokenManagerService(), atLeastOnce()). + renewToken(eq(collector.getDelegationTokenForApp()), + any(String.class)); - // Update the regenerated token in UGI and retry publishing entities. - Token regeneratedToken = - collector.getDelegationTokenForApp(); - regeneratedToken.setService(new Text("localhost" + - regeneratedToken.getService().toString().substring( - regeneratedToken.getService().toString().indexOf(":")))); - UserGroupInformation.getCurrentUser().addToken(regeneratedToken); - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 2)); - // Token was generated twice, once when app collector was created and - // later after token lifetime expiry. - verify(collectorManager.getTokenManagerService(), times(2)). - generateToken(any(UserGroupInformation.class), any(String.class)); - assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager). - getTokenExpiredCnt()); - } - // Wait for async entity to be published. - for (int i = 0; i < 50; i++) { - if (entityTypeDir.listFiles().length == 2) { + // Wait to ensure lifetime of token expires and ensure its regenerated + // automatically. + Thread.sleep(3000); + for (int i = 0; i < 40; i++) { + if (!token.equals(collector.getDelegationTokenForApp())) { break; } Thread.sleep(50); } - assertEquals(2, entityTypeDir.listFiles().length); - verifyEntity(entityTypeDir, "entity2", entityType); - AppLevelTimelineCollector collector = - (AppLevelTimelineCollector)collectorManager.get(appId); - assertNotNull(collector); - auxService.removeApplication(appId); - verify(collectorManager.getTokenManagerService()).cancelToken( - eq(collector.getDelegationTokenForApp()), any(String.class)); - } finally { - FileUtils.deleteQuietly(entityTypeDir); + assertNotEquals("Token should have been regenerated.", token, + collector.getDelegationTokenForApp()); + Thread.sleep(1000); + // Try publishing with the old token in UGI. Publishing should fail due + // to invalid token. + try { + publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); + fail("Exception should have been thrown due to Invalid Token."); + } catch (YarnException e) { + assertTrue("Exception thrown should have been due to Invalid Token.", + e.getCause().getMessage().contains("InvalidToken")); + } + + // Update the regenerated token in UGI and retry publishing entities. + Token regeneratedToken = + collector.getDelegationTokenForApp(); + regeneratedToken.setService(new Text("localhost" + + regeneratedToken.getService().toString().substring( + regeneratedToken.getService().toString().indexOf(":")))); + UserGroupInformation.getCurrentUser().addToken(regeneratedToken); + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); + // Token was generated twice, once when app collector was created and + // later after token lifetime expiry. + verify(collectorManager.getTokenManagerService(), times(2)). + generateToken(any(UserGroupInformation.class), any(String.class)); + assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager). + getTokenExpiredCnt()); + } + // Wait for async entity to be published. + FileFilter tmpFilter = (pathname -> !pathname.getName().endsWith(".tmp")); + for (int i = 0; i < 50; i++) { + if (entityTypeDir.listFiles(tmpFilter).length == 2) { + break; + } + Thread.sleep(50); } + assertEquals(2, entityTypeDir.listFiles(tmpFilter).length); + verifyEntity(entityTypeDir, "entity2", entityType); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + assertNotNull(collector); + auxService.removeApplication(appId); + verify(collectorManager.getTokenManagerService()).cancelToken( + eq(collector.getDelegationTokenForApp()), any(String.class)); } private static class DummyNodeTimelineCollectorManager extends -- 2.7.4 (Apple Git-66)