diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 46b5850..71466cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -49,10 +49,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; public class TestDistributedShell { @@ -62,10 +65,14 @@ protected MiniYARNCluster yarnCluster = null; protected YarnConfiguration conf = null; private static final int NUM_NMS = 1; + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator"; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); + @Rule + public TestName currTestName= new TestName(); + @Before public void setup() throws Exception { setupInternal(NUM_NMS); @@ -74,18 +81,30 @@ public void setup() throws Exception { protected void setupInternal(int numNodeManager) throws Exception { LOG.info("Starting up YARN cluster"); - + conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // mark if we need to launch the v1 timeline server + boolean enableATSV1 = false; + if (!currTestName.getMethodName().toLowerCase().contains("v2")) { + // disable aux-service based timeline aggregators + conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); + enableATSV1 = true; + } else { + // enable aux-service based timeline aggregators + conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + + ".class", PerNodeAggregatorServer.class.getName()); + } conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - + if (yarnCluster == null) { yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1, true); + numNodeManager, 1, 1, enableATSV1); yarnCluster.init(conf); yarnCluster.start(); @@ -140,15 +159,21 @@ public void tearDown() throws IOException { @Test(timeout=90000) public void testDSShellWithDomain() throws Exception { - testDSShell(true); + testDSShell(true, "v1"); } @Test(timeout=90000) public void testDSShellWithoutDomain() throws Exception { - testDSShell(false); + testDSShell(false, "v1"); } - public void testDSShell(boolean haveDomain) throws Exception { + @Test(timeout=90000) + public void testDSShellWithoutDomainV2() throws Exception { + testDSShell(false, "v2"); + } + + public void testDSShell(boolean haveDomain, String timelineVersion) + throws Exception { String[] args = { "--jar", APPMASTER_JAR, @@ -175,9 +200,17 @@ public void testDSShell(boolean haveDomain) throws Exception { "writer_user writer_group", "--create" }; - List argsList = new ArrayList(Arrays.asList(args)); - argsList.addAll(Arrays.asList(domainArgs)); - args = argsList.toArray(new String[argsList.size()]); + args = mergeArgs(args, domainArgs); + } + boolean isTestingTimelineV2 = false; + if (timelineVersion.equalsIgnoreCase("v2")) { + String[] timelineArgs = { + "--timeline_service_version", + "v2" + }; + isTestingTimelineV2 = true; + args = mergeArgs(args, timelineArgs); + LOG.info("Setup: Using timeline v2!"); } LOG.info("Initializing DS Client"); @@ -231,7 +264,15 @@ public void run() { LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - TimelineDomain domain = null; + if (!isTestingTimelineV2) { + checkTimelineV1(haveDomain); + } else { + checkTimelineV2(haveDomain); + } + } + + private void checkTimelineV1(boolean haveDomain) throws Exception { + TimelineDomain domain = null; if (haveDomain) { domain = yarnCluster.getApplicationHistoryServer() .getTimelineStore().getDomain("TEST_DOMAIN"); @@ -275,6 +316,24 @@ public void run() { } } + private void checkTimelineV2(boolean haveDomain) { + // TODO check timeline V2 here after we have a storage layer + } + + /** + * Utility function to merge two String arrays to form a new String array for + * our argumemts. + * + * @param args + * @param newArgs + * @return a String array consists of {args, newArgs} + */ + private String[] mergeArgs(String[] args, String[] newArgs) { + List argsList = new ArrayList(Arrays.asList(args)); + argsList.addAll(Arrays.asList(newArgs)); + return argsList.toArray(new String[argsList.size()]); + } + /* * NetUtils.getHostname() returns a string in the form "hostname/ip". * Sometimes the hostname we get is the FQDN and sometimes the short name. In diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java index 46e5574..e362139 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -72,6 +72,12 @@ protected void serviceStop() throws Exception { */ public void postEntities(TimelineEntities entities, UserGroupInformation callerUgi) { + // Add this output temporarily for our prototype + // TODO remove this after we have an actual implementation + LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.info("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + // TODO implement if (LOG.isDebugEnabled()) { LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java index 55c6271..deb21c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java @@ -118,8 +118,8 @@ private void startWebApp() { .setConf(conf) .addEndpoint(URI.create("http://" + bindAddress)); timelineRestServer = builder.build(); - // TODO: replace this by an authentification filter in future. - HashMap options = new HashMap(); + // TODO: replace this by an authentication filter in future. + HashMap options = new HashMap<>(); String username = conf.get(HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER); options.put(HADOOP_HTTP_STATIC_USER, username);