diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index f52e654..c379987 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -19,9 +19,6 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -30,7 +27,11 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,7 +47,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -56,9 +56,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; -import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -74,9 +74,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The job history events get routed to this class. This class writes the Job * history events to the DFS directly into a staging dir and then moved to a @@ -122,13 +121,6 @@ protected static final Map fileMap = Collections.synchronizedMap(new HashMap()); - - // For posting entities in new timeline service in a non-blocking way - // TODO YARN-3367 replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; @@ -137,6 +129,10 @@ private boolean newTimelineServiceEnabled = false; + // For posting entities in new timeline service in a non-blocking way + // TODO YARN-3367 replace with event loop in TimelineClient. + private ExecutorService threadPool; + private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT"; @@ -265,22 +261,26 @@ protected void serviceInit(Configuration conf) throws Exception { // configuration status: off, on_with_v1 or on_with_v2. if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - + LOG.info("Emitting job history data to the timeline server is enabled"); + if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineClient = ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); - newTimelineServiceEnabled = conf.getBoolean( - MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, - MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); - LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1")); - LOG.info("Emitting job history data to the timeline server is enabled"); + newTimelineServiceEnabled = + YarnConfiguration.getTimelineServiceVersion(conf) == 2; + LOG.info("Timeline service is enabled; version: " + + (newTimelineServiceEnabled? "v2" : "v1")); + if (newTimelineServiceEnabled) { + // initialize the thread pool for v.2 timeline service + threadPool = createThreadPool(); + } } else { LOG.info("Timeline service is not enabled"); } } else { - LOG.info("Emitting job history data to the timeline server is not enabled"); + LOG.info("Emitting job history data to the timeline server is not " + + "enabled"); } // Flag for setting @@ -448,19 +448,27 @@ protected void serviceStop() throws Exception { if (timelineClient != null) { timelineClient.stop(); } - shutdownAndAwaitTermination(); + if (threadPool != null) { + shutdownAndAwaitTermination(); + } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } // TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index dafb6e9..ff0f6a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1017,14 +1017,9 @@ public RunningAppContext(Configuration config, this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) - && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - - boolean newTimelineServiceEnabled = conf.getBoolean( - MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, - MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); - - if (newTimelineServiceEnabled) { + && YarnConfiguration.timelineServiceEnabled(conf)) { + + if (YarnConfiguration.getTimelineServiceVersion(conf) == 2) { // create new version TimelineClient timelineClient = TimelineClient.createTimelineClient( appAttemptID.getApplicationId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 3ab6eeb..3d1e841 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -467,11 +467,6 @@ "mapreduce.job.emit-timeline-data"; public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA = false; - - public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = - "mapreduce.job.new-timeline-service.enabled"; - public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = - false; public static final String MR_PREFIX = "yarn.app.mapreduce."; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 0585234..6ece048 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -618,13 +618,6 @@ - - mapreduce.job.new-timeline-service.enabled - false - Specifies if posting job and task events to new timeline service. - - - mapreduce.input.fileinputformat.split.minsize 0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index b3ea26e..5a0c979 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -161,11 +161,10 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { LOG.info("testMRNewTimelineServiceEventHandling start."); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // enable new timeline serivce + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); - // enable new timeline serivce in MR side - conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true); - // enable aux-service based timeline collectors conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 18a4c14..edb825d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -173,7 +173,7 @@ public void serviceInit(Configuration conf) throws Exception { boolean enableTimelineAuxService = false; if (nmAuxServices != null) { for (String nmAuxService: nmAuxServices) { - if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) { + if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) { enableTimelineAuxService = true; break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7f568a6..04b3c68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1569,6 +1569,9 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_PREFIX = YARN_PREFIX + "timeline-service."; + public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX + + "version"; + public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f; /** * Comma seperated list of names for UIs hosted in the timeline server * (For pluggable UIs). @@ -2341,13 +2344,51 @@ public static String getClusterId(Configuration conf) { } return clusterId; } - - public static boolean systemMetricsPublisherEnabled(Configuration conf) { + + // helper methods for timeline service configuration + /** + * Returns whether the timeline service is enabled via configuration. + * + * @param conf the configuration + * @return whether the timelien service is enabled. + */ + public static boolean timelineServiceEnabled(Configuration conf) { return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + } + + /** + * Returns the timeline service version. It does not check whether the + * timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service version as a float. + */ + public static float getTimelineServiceVersion(Configuration conf) { + return conf.getFloat(TIMELINE_SERVICE_VERSION, + DEFAULT_TIMELINE_SERVICE_VERSION); + } + + /** + * Returns whether the timeline service v.2 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.2 is enabled. + */ + public static boolean timelineServiceV2Enabled(Configuration conf) { + return timelineServiceEnabled(conf) && getTimelineServiceVersion(conf) == 2; + } + + /** + * Returns whether the system publisher is enabled (for the timeline service + * v.2). It does not check whether the timeline service v.2 itself is enabled. + * + * @param conf the configuration + * @return whether the system publisher is enabled. + */ + public static boolean systemMetricsPublisherEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); } /* For debugging. mp configurations to system output as XML format. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index e27c947..35b869b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -221,10 +221,7 @@ // For posting entities in new timeline service in a non-blocking way // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); + private ExecutorService threadPool; // App Master configuration // No. of containers to run shell command on @@ -314,8 +311,10 @@ public static void main(String[] args) { } appMaster.run(); result = appMaster.finish(); - - shutdownAndAwaitTermination(); + + if (appMaster.threadPool != null) { + appMaster.shutdownAndAwaitTermination(); + } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -329,16 +328,22 @@ public static void main(String[] args) { System.exit(2); } } - + //TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { // Wait a while for existing tasks to terminate if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); @@ -404,8 +409,7 @@ public boolean init(String[] args) throws ParseException, IOException { "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("debug", false, "Dump out debug information"); - opts.addOption("timeline_service_version", true, - "Version for timeline service"); + opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -542,27 +546,15 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - if (cliParser.hasOption("timeline_service_version")) { - String timelineServiceVersion = - cliParser.getOptionValue("timeline_service_version", "v1"); - if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { - newTimelineService = false; - } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) { - newTimelineService = true; - } else { - throw new IllegalArgumentException( - "timeline_service_version is not set properly, should be 'v1' or 'v2'"); - } + if (YarnConfiguration.timelineServiceEnabled(conf)) { + newTimelineService = + YarnConfiguration.getTimelineServiceVersion(conf) == 2; + if (newTimelineService) { + threadPool = createThreadPool(); } } else { timelineClient = null; LOG.warn("Timeline service is not enabled"); - if (cliParser.hasOption("timeline_service_version")) { - throw new IllegalArgumentException( - "Timeline service is not enabled"); - } } return true; @@ -623,16 +615,16 @@ public void run() throws YarnException, IOException, InterruptedException { nmClientAsync.start(); startTimelineClient(conf); - // need to bind timelineClient - amRMClient.registerTimelineClient(timelineClient); + if (newTimelineService) { + // need to bind timelineClient + amRMClient.registerTimelineClient(timelineClient); + } if(timelineClient != null) { if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, - appSubmitterUgi); + publishApplicationAttemptEventOnNewTimelineService( + DSEvent.DS_APP_ATTEMPT_START); } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_START); } } @@ -703,8 +695,7 @@ void startTimelineClient(final Configuration conf) appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + if (YarnConfiguration.timelineServiceEnabled(conf)) { // Creating the Timeline Client if (newTimelineService) { timelineClient = TimelineClient.createTimelineClient( @@ -743,12 +734,10 @@ protected boolean finish() { if (timelineClient != null) { if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, - appSubmitterUgi); + publishApplicationAttemptEventOnNewTimelineService( + DSEvent.DS_APP_ATTEMPT_END); } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_END); } } @@ -856,11 +845,9 @@ public void onContainersCompleted(List completedContainers) { } if(timelineClient != null) { if (newTimelineService) { - publishContainerEndEventOnNewTimelineService( - timelineClient, containerStatus, domainId, appSubmitterUgi); + publishContainerEndEventOnNewTimelineService(containerStatus); } else { - publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); + publishContainerEndEvent(containerStatus); } } } @@ -982,13 +969,10 @@ public void onContainerStarted(ContainerId containerId, } if(applicationMaster.timelineClient != null) { if (applicationMaster.newTimelineService) { - ApplicationMaster.publishContainerStartEventOnNewTimelineService( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); + applicationMaster.publishContainerStartEventOnNewTimelineService( + container); } else { - ApplicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); + applicationMaster.publishContainerStartEvent(container); } } } @@ -1195,14 +1179,12 @@ private String readContent(String filePath) throws IOException { } } - private static void publishContainerStartEvent( - final TimelineClient timelineClient, Container container, String domainId, - UserGroupInformation ugi) { + private void publishContainerStartEvent(Container container) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1211,12 +1193,13 @@ private static void publishContainerStartEvent( entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public TimelinePutResponse run() throws Exception { - return timelineClient.putEntities(entity); - } - }); + appSubmitterUgi.doAs( + new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); } catch (Exception e) { LOG.error("Container start event could not be published for " + container.getId().toString(), @@ -1224,14 +1207,12 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishContainerEndEvent( - final TimelineClient timelineClient, ContainerStatus container, - String domainId, UserGroupInformation ugi) { + private void publishContainerEndEvent(ContainerStatus container) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1246,14 +1227,12 @@ private static void publishContainerEndEvent( } } - private static void publishApplicationAttemptEvent( - final TimelineClient timelineClient, String appAttemptId, - DSEvent appEvent, String domainId, UserGroupInformation ugi) { + private void publishApplicationAttemptEvent(DSEvent appEvent) { final TimelineEntity entity = new TimelineEntity(); - entity.setEntityId(appAttemptId); + entity.setEntityId(appAttemptID.toString()); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); @@ -1264,7 +1243,7 @@ private static void publishApplicationAttemptEvent( LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), e); + + appAttemptID, e); } } @@ -1296,27 +1275,24 @@ Thread createLaunchContainerThread(Container allocatedContainer, return new Thread(runnableLaunchContainer); } - private static void publishContainerStartEventOnNewTimelineService( - final TimelineClient timelineClient, final Container container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerStartEventOnNewTimelineService( + final Container container) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerStartEventOnNewTimelineServiceBase(timelineClient, - container, domainId, ugi); + publishContainerStartEventOnNewTimelineServiceBase(container); } }; threadPool.execute(publishWrapper); } - private static void publishContainerStartEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, Container container, String domainId, - UserGroupInformation ugi) { + private void publishContainerStartEventOnNewTimelineServiceBase( + Container container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); @@ -1327,7 +1303,7 @@ private static void publishContainerStartEventOnNewTimelineServiceBase( entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1341,27 +1317,24 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishContainerEndEventOnNewTimelineService( - final TimelineClient timelineClient, final ContainerStatus container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerEndEventOnNewTimelineService( + final ContainerStatus container) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerEndEventOnNewTimelineServiceBase(timelineClient, - container, domainId, ugi); + publishContainerEndEventOnNewTimelineServiceBase(container); } }; threadPool.execute(publishWrapper); } - private static void publishContainerEndEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, final ContainerStatus container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerEndEventOnNewTimelineServiceBase( + final ContainerStatus container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); @@ -1371,7 +1344,7 @@ private static void publishContainerEndEventOnNewTimelineServiceBase( entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1385,29 +1358,25 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishApplicationAttemptEventOnNewTimelineService( - final TimelineClient timelineClient, final String appAttemptId, - final DSEvent appEvent, final String domainId, - final UserGroupInformation ugi) { + private void publishApplicationAttemptEventOnNewTimelineService( + final DSEvent appEvent) { Runnable publishWrapper = new Runnable() { public void run() { - publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, - appAttemptId, appEvent, domainId, ugi); + publishApplicationAttemptEventOnNewTimelineServiceBase(appEvent); } }; threadPool.execute(publishWrapper); } - private static void publishApplicationAttemptEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, String appAttemptId, - DSEvent appEvent, String domainId, UserGroupInformation ugi) { + private void publishApplicationAttemptEventOnNewTimelineServiceBase( + DSEvent appEvent) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); - entity.setId(appAttemptId); + entity.setId(appAttemptID.toString()); entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setId(appEvent.toString()); @@ -1415,7 +1384,7 @@ private static void publishApplicationAttemptEventOnNewTimelineServiceBase( entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1426,7 +1395,7 @@ public TimelinePutResponse run() throws Exception { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), + + appAttemptID, e instanceof UndeclaredThrowableException ? e.getCause() : e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2819c91..e66005e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -192,8 +192,6 @@ // Command line options private Options opts; - private String timelineServiceVersion; - private static final String shellCommandPath = "shellCommands"; private static final String shellArgsPath = "shellArgs"; private static final String appMasterJarPath = "AppMaster.jar"; @@ -269,7 +267,6 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); - opts.addOption("timeline_service_version", true, "Version for timeline service"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + " If the flag is true, running containers will not be killed when" + @@ -371,16 +368,6 @@ public boolean init(String[] args) throws ParseException { + " Specified virtual cores=" + amVCores); } - if (cliParser.hasOption("timeline_service_version")) { - timelineServiceVersion = - cliParser.getOptionValue("timeline_service_version", "v1"); - if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || - timelineServiceVersion.trim().equalsIgnoreCase("v2"))) { - throw new IllegalArgumentException( - "timeline_service_version is not set properly, should be 'v1' or 'v2'"); - } - } - if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); } @@ -690,9 +677,6 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } - if (timelineServiceVersion != null) { - vargs.add("--timeline_service_version " + timelineServiceVersion); - } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index fe817c3..ff34bcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -107,6 +107,8 @@ protected void setupInternal(int numNodeManager, TestName testName) true); conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false); } else { + // set version to 2 + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); // enable aux-service based timeline collectors conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." @@ -245,12 +247,7 @@ private void testDSShell(boolean haveDomain, String timelineVersion, } boolean isTestingTimelineV2 = false; if (timelineVersion.equalsIgnoreCase("v2")) { - String[] timelineArgs = { - "--timeline_service_version", - "v2" - }; isTestingTimelineV2 = true; - args = mergeArgs(args, timelineArgs); if (!defaultFlow) { String[] flowArgs = { "--flow_name", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 7176146..d096a6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -403,7 +403,7 @@ public synchronized void setCollectorAddr(String collectorAddr) { } @Override - public Priority getApplicationPriority() { + public synchronized Priority getApplicationPriority() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; if (this.appPriority != null) { return this.appPriority; @@ -416,7 +416,7 @@ public Priority getApplicationPriority() { } @Override - public void setApplicationPriority(Priority priority) { + public synchronized void setApplicationPriority(Priority priority) { maybeInitBuilder(); if (priority == null) builder.clearApplicationPriority(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index f2707ba..9772dc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -48,17 +48,21 @@ * current user may use {@link UserGroupInformation#doAs} another user to * construct and initialize a timeline client if the following operations are * supposed to be conducted by that user. - * - * @return a timeline client */ protected ApplicationId contextAppId; + /** + * Creates an instance of the timeline v.1.x client. + */ @Public public static TimelineClient createTimelineClient() { TimelineClient client = new TimelineClientImpl(); return client; } + /** + * Creates an instance of the timeline v.2 client. + */ @Public public static TimelineClient createTimelineClient(ApplicationId appId) { TimelineClient client = new TimelineClientImpl(appId); @@ -156,8 +160,9 @@ public abstract void cancelDelegationToken( /** *

* Send the information of a number of conceptual entities to the timeline - * aggregator. It is a blocking API. The method will not return until all the - * put entities have been persisted. + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. If this method is invoked + * for a non-v.2 timeline client instance, a YarnException is thrown. *

* * @param entities @@ -173,8 +178,9 @@ public abstract void putEntities( /** *

* Send the information of a number of conceptual entities to the timeline - * aggregator. It is an asynchronous API. The method will return once all the - * entities are received. + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. If this method is invoked for a + * non-v.2 timeline client instance, a YarnException is thrown. *

* * @param entities diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 8312b6d..2190d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -353,6 +353,9 @@ public void putEntitiesAsync( private void putEntities(boolean async, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { + if (!newTimelineService) { + throw new YarnException("v.2 method is invoked on a v.1.x client"); + } org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entitiesContainer = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index adc28f0..7defe87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1784,6 +1784,14 @@ + Indicate to clients what is the current version of the running + timeline service. + + yarn.timeline-service.version + 1.0f + + + The hostname of the timeline service web application. yarn.timeline-service.hostname 0.0.0.0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index fa0cf5c..066abfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -261,10 +261,12 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) { private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; List list = p.getRegisteredCollectorsList(); - this.registeredCollectors = new HashMap (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.registeredCollectors = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 2521b9c..151006b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -531,10 +531,12 @@ private void initSystemCredentials() { private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getAppCollectorsMapList(); - this.appCollectorsMap = new HashMap (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.appCollectorsMap = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 0b378a1..8fce422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -64,7 +64,8 @@ /** * Get the registered collectors that located on this NM. - * @return registered + * @return registered collectors, or null if the timeline service v.2 is not + * enabled */ Map getRegisteredCollectors(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 601bd04..406a104 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -97,6 +97,7 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + // the NM collector service is set only if the timeline service v.2 is enabled private NMCollectorService nmCollectorService; private NodeStatusUpdater nodeStatusUpdater; private NodeResourceMonitor nodeResourceMonitor; @@ -356,8 +357,10 @@ protected void serviceInit(Configuration conf) throws Exception { DefaultMetricsSystem.initialize("NodeManager"); - this.nmCollectorService = createNMCollectorService(context); - addService(nmCollectorService); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.nmCollectorService = createNMCollectorService(context); + addService(nmCollectorService); + } // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. @@ -457,8 +460,7 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); - protected Map registeredCollectors = - new ConcurrentHashMap(); + protected Map registeredCollectors; protected final ConcurrentMap increasedContainers = @@ -484,6 +486,9 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, NMStateStoreService stateStore, Configuration conf) { + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.registeredCollectors = new ConcurrentHashMap<>(); + } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -620,7 +625,9 @@ public void setSystemCrendentialsForApps( public void addRegisteredCollectors( Map newRegisteredCollectors) { - this.registeredCollectors.putAll(newRegisteredCollectors); + if (this.registeredCollectors != null) { + this.registeredCollectors.putAll(newRegisteredCollectors); + } } @Override @@ -709,7 +716,14 @@ public Context getNMContext() { return this.context; } - // For testing + /** + * Returns the NM collector service. It should be used only for testing + * purposes. + * + * @return the NM collector service, or null if the timeline service v.2 is + * not enabled + */ + @VisibleForTesting NMCollectorService getNMCollectorService() { return this.nmCollectorService; } @@ -717,6 +731,7 @@ NMCollectorService getNMCollectorService() { public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + @SuppressWarnings("resource") NodeManager nodeManager = new NodeManager(); Configuration conf = new YarnConfiguration(); new GenericOptionsParser(conf, args); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index d39204f..7ca936a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -821,7 +821,7 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrSignalContainersEvent(containersToSignal)); } - if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { + if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { updateTimelineClientsAddress(response); } @@ -850,46 +850,37 @@ public void run() { } } - /** - * Caller should take care of sending non null nodelabels for both - * arguments - * - * @param nodeLabelsNew - * @param nodeLabelsOld - * @return if the New node labels are diff from the older one. - */ - private boolean areNodeLabelsUpdated(Set nodeLabelsNew, - Set nodeLabelsOld) { - if (nodeLabelsNew.size() != nodeLabelsOld.size() - || !nodeLabelsOld.containsAll(nodeLabelsNew)) { - return true; - } - return false; - } - private void updateTimelineClientsAddress( NodeHeartbeatResponse response) { - Set> rmKnownCollectors = - response.getAppCollectorsMap().entrySet(); - for (Map.Entry entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); + Map knownCollectorsMap = + response.getAppCollectorsMap(); + if (knownCollectorsMap == null) { + LOG.warn("the collectors map is null"); + } else { + Set> rmKnownCollectors = + knownCollectorsMap.entrySet(); + for (Map.Entry entry : rmKnownCollectors) { + ApplicationId appId = entry.getKey(); + String collectorAddr = entry.getValue(); + + // Only handle applications running on local node. + // Not include apps with timeline collectors running in local + Application application = context.getApplications().get(appId); + if (application != null && + !context.getRegisteredCollectors().containsKey(appId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + collectorAddr + + " for application: " + appId + " from RM."); + } + TimelineClient client = application.getTimelineClient(); + if (client != null) { + client.setTimelineServiceAddress(collectorAddr); + } } - TimelineClient client = application.getTimelineClient(); - client.setTimelineServiceAddress(collectorAddr); } } } - + private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index db79ee5..3ba81ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +/** + * Service that handles collector information. It is used only if the timeline + * service v.2 is enabled. + */ public class NMCollectorService extends CompositeService implements CollectorNodemanagerProtocol { @@ -113,9 +117,9 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( String collectorAddr = collector.getCollectorAddr(); newCollectorsMap.put(appId, collectorAddr); // set registered collector address to TimelineClient. - if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { - TimelineClient client = - context.getApplications().get(appId).getTimelineClient(); + TimelineClient client = + context.getApplications().get(appId).getTimelineClient(); + if (client != null) { client.setTimelineServiceAddress(collectorAddr); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index b010eee..c052a07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; @@ -190,7 +191,8 @@ private long waitForContainersOnShutdownMillis; - private final NMTimelinePublisher nmMetricsPublisher; + // NM metrics publisher is set only if the timeline service v.2 is enabled + private NMTimelinePublisher nmMetricsPublisher; public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -218,8 +220,15 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); - nmMetricsPublisher = createNMTimelinePublisher(context); - context.setNMTimelinePublisher(nmMetricsPublisher); + // initialize the metrics publisher if the timeline service v.2 is enabled + // and the system publisher is enabled + Configuration conf = context.getConf(); + if (YarnConfiguration.timelineServiceV2Enabled(conf) && + YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + LOG.info("YARN system metrics publishing service is enabled"); + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); + } this.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); @@ -237,7 +246,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, addService(dispatcher); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -335,7 +343,7 @@ private void recoverApplication(ContainerManagerApplicationProto p) LOG.info("Recovering application " + appId); //TODO: Recover flow and flow run ID ApplicationImpl app = new ApplicationImpl( - dispatcher, p.getUser(), null, null, 0, appId, creds, context); + dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -941,20 +949,29 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, try { if (!serviceStopped) { // Create the application - String flowName = launchContext.getEnvironment().get( - TimelineUtils.FLOW_NAME_TAG_PREFIX); - String flowVersion = launchContext.getEnvironment().get( - TimelineUtils.FLOW_VERSION_TAG_PREFIX); - String flowRunIdStr = launchContext.getEnvironment().get( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - long flowRunId = 0L; - if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.parseLong(flowRunIdStr); + // populate the flow context from the launch context if the timeline + // service v.2 is enabled + FlowContext flowContext; + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment().get( + TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment().get( + TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = + new FlowContext(flowName, flowVersion, flowRunId); + } else { + flowContext = null; } if (!context.getApplications().containsKey(applicationID)) { Application application = - new ApplicationImpl(dispatcher, user, flowName, flowVersion, - flowRunId, applicationID, credentials, context); + new ApplicationImpl(dispatcher, user, flowContext, + applicationID, credentials, context); if (context.getApplications().putIfAbsent(applicationID, application) == null) { LOG.info("Creating a new application reference for app " @@ -1310,7 +1327,9 @@ public void handle(ContainerEvent event) { Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); - nmMetricsPublisher.publishContainerEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishContainerEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1326,7 +1345,9 @@ public void handle(ApplicationEvent event) { event.getApplicationID()); if (app != null) { app.handle(event); - nmMetricsPublisher.publishApplicationEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishApplicationEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1349,7 +1370,9 @@ public void handle(ApplicationEvent event) { @Override public void handle(LocalizationEvent event) { origLocalizationEventHandler.handle(event); - timelinePublisher.publishLocalizationEvent(event); + if (timelinePublisher != null) { + timelinePublisher.publishLocalizationEvent(event); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 6e87cfd..93c6758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -67,9 +67,8 @@ final Dispatcher dispatcher; final String user; - final String flowName; - final String flowVersion; - final long flowRunId; + // flow context is set only if the timeline service v.2 is enabled + private FlowContext flowContext; final ApplicationId appId; final Credentials credentials; Map applicationACLs; @@ -86,14 +85,16 @@ Map containers = new HashMap(); - public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, - String flowVersion, long flowRunId, ApplicationId appId, - Credentials credentials, Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context) { + this(dispatcher, user, null, appId, credentials, context); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + FlowContext flowContext, ApplicationId appId, Credentials credentials, + Context context) { this.dispatcher = dispatcher; this.user = user; - this.flowName = flowName; - this.flowVersion = flowVersion; - this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); @@ -103,11 +104,44 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); Configuration conf = context.getConf(); - if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - createAndStartTimelineClient(conf); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + if (flowContext == null) { + throw new IllegalArgumentException("flow context cannot be null"); + } + this.flowContext = flowContext; + if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + createAndStartTimelineClient(conf); + } } } - + + /** + * Data object that encapsulates the flow context for the application purpose. + */ + public static class FlowContext { + private final String flowName; + private final String flowVersion; + private final long flowRunId; + + public FlowContext(String flowName, String flowVersion, long flowRunId) { + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; + } + + public long getFlowRunId() { + return flowRunId; + } + } + private void createAndStartTimelineClient(Configuration conf) { // create and start timeline client this.timelineClient = TimelineClient.createTimelineClient(appId); @@ -454,7 +488,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { // Remove collectors info for finished apps. // TODO check we remove related collectors info in failure cases // (YARN-3038) - app.context.getRegisteredCollectors().remove(app.getAppId()); + Map registeredCollectors = + app.context.getRegisteredCollectors(); + if (registeredCollectors != null) { + registeredCollectors.remove(app.getAppId()); + } // stop timelineClient when application get finished. TimelineClient timelineClient = app.getTimelineClient(); if (timelineClient != null) { @@ -521,16 +559,16 @@ public LogAggregationContext getLogAggregationContext() { @Override public String getFlowName() { - return flowName; + return flowContext == null ? null : flowContext.getFlowName(); } @Override public String getFlowVersion() { - return flowVersion; + return flowContext == null ? null : flowContext.getFlowVersion(); } @Override public long getFlowRunId() { - return flowRunId; + return flowContext == null ? 0L : flowContext.getFlowRunId(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index dfa32ac..589cf75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -559,9 +560,13 @@ public void run() { ContainerImpl container = (ContainerImpl) context.getContainers().get(containerId); - container.getNMTimelinePublisher().reportContainerResourceUsage( - container, currentTime, pId, currentPmemUsage, - cpuUsageTotalCoresPercentage); + NMTimelinePublisher nmMetricsPublisher = + container.getNMTimelinePublisher(); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.reportContainerResourceUsage( + container, currentTime, pId, currentPmemUsage, + cpuUsageTotalCoresPercentage); + } } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 2c5c300..69de433 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -56,12 +55,16 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +/** + * Metrics publisher service that publishes data to the timeline service v.2. It + * is used only if the timeline service v.2 is enabled and the system publishing + * of events and metrics is enabled. + */ public class NMTimelinePublisher extends CompositeService { private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); private Dispatcher dispatcher; - private boolean publishSystemMetrics; private Context context; @@ -76,24 +79,16 @@ public NMTimelinePublisher(Context context) { @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetrics = - YarnConfiguration.systemMetricsPublisherEnabled(conf); - - if (publishSystemMetrics) { - dispatcher = new AsyncDispatcher(); - dispatcher.register(NMTimelineEventType.class, - new ForwardingEventHandler()); - dispatcher - .register(ContainerEventType.class, new ContainerEventHandler()); - dispatcher.register(ApplicationEventType.class, - new ApplicationEventHandler()); - dispatcher.register(LocalizationEventType.class, - new LocalizationEventDispatcher()); - addIfService(dispatcher); - LOG.info("YARN system metrics publishing service is enabled"); - } else { - LOG.info("YARN system metrics publishing service is not enabled"); - } + dispatcher = new AsyncDispatcher(); + dispatcher.register(NMTimelineEventType.class, + new ForwardingEventHandler()); + dispatcher + .register(ContainerEventType.class, new ContainerEventHandler()); + dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventDispatcher()); + addIfService(dispatcher); super.serviceInit(conf); } @@ -121,8 +116,9 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) { public void reportContainerResourceUsage(Container container, long createdTime, String pId, Long pmemUsage, Float cpuUsageTotalCoresPercentage) { - if (publishSystemMetrics - && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) { + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || + cpuUsageTotalCoresPercentage != + ResourceCalculatorProcessTree.UNAVAILABLE) { ContainerEntity entity = createContainerEntity(container.getContainerId()); long currentTimeMillis = System.currentTimeMillis(); @@ -219,9 +215,6 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { } public void publishApplicationEvent(ApplicationEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case INIT_APPLICATION: @@ -242,9 +235,6 @@ public void publishApplicationEvent(ApplicationEvent event) { } public void publishContainerEvent(ContainerEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case INIT_CONTAINER: @@ -262,9 +252,6 @@ public void publishContainerEvent(ContainerEvent event) { } public void publishLocalizationEvent(LocalizationEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case CONTAINER_RESOURCES_LOCALIZED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index f29b791..c43777c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -50,7 +50,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -95,7 +94,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -106,7 +104,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; public class TestContainerManagerRecovery extends BaseContainerManagerTest { @@ -473,7 +470,7 @@ private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore, conf){ + new ApplicationACLsManager(conf), stateStore, conf) { public int getHttpPort() { return HTTP_PORT; } @@ -638,9 +635,9 @@ public void setBlockNewContainerRequests( } @Override - public NMTimelinePublisher createNMTimelinePublisher(Context context) { - NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class); - return timelinePublisher; + public NMTimelinePublisher + createNMTimelinePublisher(Context context) { + return null; } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 38b3172f..f31a98c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -535,7 +534,7 @@ public boolean matches(Object argument) { this.appId = BuilderUtils.newApplicationId(timestamp, id); app = new ApplicationImpl( - dispatcher, this.user, null, null, 0, appId, null, context); + dispatcher, this.user, appId, null, context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index ef5eb65..a6818ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -31,17 +31,14 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.NodeHealthScriptRunner; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -50,7 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp; @@ -64,6 +60,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.w3c.dom.Document; @@ -327,7 +324,7 @@ public void testContainerLogs() throws IOException { final String filename = "logfile1"; final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", - null, null, 0, appId, null, nmContext)); + appId, null, nmContext)); MockContainer container = new MockContainer(appAttemptId, new AsyncDispatcher(), new Configuration(), "user", appId, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index dd817d0..9ace1fb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -293,8 +293,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + // Remove collector address when app get finished. - rmApp.removeCollectorAddr(); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + rmApp.removeCollectorAddr(); + } // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. @@ -562,8 +565,10 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); // add collector address for this application - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + } // add preemption to the allocateResponse message (if any) allocateResponse diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 213c226..04f4df2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -571,24 +571,27 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } - // Sanity check for flow run - String value = null; - try { - for (String tag : submissionContext.getApplicationTags()) { - if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || - tag.startsWith( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { - value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1); - Long.valueOf(value); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + // Sanity check for flow run + String value = null; + try { + for (String tag : submissionContext.getApplicationTags()) { + if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || + tag.startsWith( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { + value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + + 1); + Long.valueOf(value); + } } + } catch (NumberFormatException e) { + LOG.warn("Invalid to flow run: " + value + + ". Flow run should be a long integer", e); + RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, + e.getMessage(), "ClientRMService", + "Exception in submitting application", applicationId); + throw RPCUtil.getRemoteException(e); } - } catch (NumberFormatException e) { - LOG.warn("Invalid to flow run: " + value + - ". Flow run should be a long integer", e); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - e.getMessage(), "ClientRMService", - "Exception in submitting application", applicationId); - throw RPCUtil.getRemoteException(e); } // Check whether app has already been put into rmContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 261526e..546609b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -364,8 +364,11 @@ private RMAppImpl createAndPopulateNewRMApp( LOG.warn(message); throw new YarnException(message); } - // Start timeline collector for the submitted app - application.startTimelineCollector(); + + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + // Start timeline collector for the submitted app + application.startTimelineCollector(); + } // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a5de053..b381132 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -375,20 +375,23 @@ private RMTimelineCollectorManager createRMTimelineCollectorManager() { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - boolean timelineServiceEnabled = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); SystemMetricsPublisher publisher = null; - if (timelineServiceEnabled) { - if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { - LOG.info("TimelineService V1 is configured"); + if (YarnConfiguration.timelineServiceEnabled(conf)) { + float version = YarnConfiguration.getTimelineServiceVersion(conf); + if (version == 1 && + conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + LOG.info("system metrics publisher with the timeline service V1 is " + + "configured"); publisher = new TimelineServiceV1Publisher(); - } else { - LOG.info("TimelineService V2 is configured"); + } else if (version == 2 && + YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + LOG.info("system metrics publisher with the timeline service V2 is " + + "configured"); publisher = new TimelineServiceV2Publisher(rmContext); } - } else { + } + if (publisher == null) { LOG.info("TimelineServicePublisher is not configured"); publisher = new NoOpSystemMetricPublisher(); } @@ -515,10 +518,12 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - RMTimelineCollectorManager timelineCollectorManager = - createRMTimelineCollectorManager(); - addService(timelineCollectorManager); - rmContext.setRMTimelineCollectorManager(timelineCollectorManager); + if (YarnConfiguration.timelineServiceV2Enabled(configuration)) { + RMTimelineCollectorManager timelineCollectorManager = + createRMTimelineCollectorManager(); + addService(timelineCollectorManager); + rmContext.setRMTimelineCollectorManager(timelineCollectorManager); + } // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e7eee6b..8cf65a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -445,10 +445,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } - // Check & update collectors info from request. - // TODO make sure it won't have race condition issue for AM failed over case - // that the older registration could possible override the newer one. - updateAppCollectorsMap(request); + boolean timelineV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(getConfig()); + if (timelineV2Enabled) { + // Check & update collectors info from request. + // TODO make sure it won't have race condition issue for AM failed over + // case that the older registration could possible override the newer + // one. + updateAppCollectorsMap(request); + } // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils @@ -467,12 +472,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } - // Return collectors' map that NM needs to know - // TODO we should optimize this to only include collector info that NM - // doesn't know yet. List keepAliveApps = remoteNodeStatus.getKeepAliveApplications(); - if (keepAliveApps != null) { + if (timelineV2Enabled && keepAliveApps != null) { + // Return collectors' map that NM needs to know + // TODO we should optimize this to only include collector info that NM + // doesn't know yet. setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index df760a3..9a7638c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.NMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -215,12 +216,14 @@ protected void setupTokens( .get(applicationId) .getSubmitTime())); - // Set flow context info - for (String tag : - rmContext.getRMApps().get(applicationId).getApplicationTags()) { - setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + // Set flow context info + for (String tag : + rmContext.getRMApps().get(applicationId).getApplicationTags()) { + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); + } } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index e0c593d..e83bcc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -58,7 +58,7 @@ import com.google.common.annotations.VisibleForTesting; /** - * This class is responsible for posting application, appattempt & Container + * This class is responsible for posting application, appattempt & Container * lifecycle related events to timeline service V2 */ @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 05f682a..78f534f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -913,15 +913,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Updating collector info for app: " + app.getApplicationId()); + if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) { + LOG.info("Updating collector info for app: " + app.getApplicationId()); - RMAppCollectorUpdateEvent appCollectorUpdateEvent = - (RMAppCollectorUpdateEvent) event; - // Update collector address - app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); + RMAppCollectorUpdateEvent appCollectorUpdateEvent = + (RMAppCollectorUpdateEvent) event; + // Update collector address + app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); - // TODO persistent to RMStateStore for recover - // Save to RMStateStore + // TODO persistent to RMStateStore for recover + // Save to RMStateStore + } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 8215468..bdf7100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -641,6 +641,7 @@ public void handle(Event event) {} ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // without name and queue @@ -732,6 +733,7 @@ public void handle(Event event) {} ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // Initialize appnames and queues String[] queues = {QUEUE_1, QUEUE_2}; @@ -895,6 +897,7 @@ public void handle(Event rawEvent) { final ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, null, null, null); + rmService.init(new Configuration()); // submit an app and wait for it to block while in app submission Thread t = new Thread() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 20a5b13..baaa566 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -131,6 +131,7 @@ public static void tearDown() throws Exception { private static Configuration getTimelineV2Conf() { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setInt( YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 4147d42..bbbcd8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -30,12 +30,11 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -68,6 +67,9 @@ public PerNodeTimelineCollectorsAuxService() { @Override protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { + throw new YarnException("Timeline service v2 is not enabled"); + } collectorManager.init(conf); super.serviceInit(conf); } @@ -188,6 +190,9 @@ public ByteBuffer getMetaData() { ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), SHUTDOWN_HOOK_PRIORITY); YarnConfiguration conf = new YarnConfiguration(); + // enable timeline service v.2 + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); auxService.init(conf); auxService.start(); } catch (Throwable t) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index afe1536..826deb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; @@ -66,6 +67,10 @@ public TimelineReaderServer() { @Override protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { + throw new YarnException("timeline service v.2 is not enabled"); + } + TimelineReader timelineReaderStore = createTimelineReaderStore(conf); addService(timelineReaderStore); timelineReaderManager = createTimelineReaderManager(timelineReaderStore); @@ -155,6 +160,9 @@ static TimelineReaderServer startTimelineReaderServer(String[] args) { new CompositeServiceShutdownHook(timelineReaderServer), SHUTDOWN_HOOK_PRIORITY); YarnConfiguration conf = new YarnConfiguration(); + // enable timeline service v.2 + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); timelineReaderServer.init(conf); timelineReaderServer.start(); } catch (Throwable t) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index dafc76e..c6a8452 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -160,7 +160,11 @@ private PerNodeTimelineCollectorsAuxService createCollector() { NodeTimelineCollectorManager collectorManager = createCollectorManager(); PerNodeTimelineCollectorsAuxService auxService = spy(new PerNodeTimelineCollectorsAuxService(collectorManager)); - auxService.init(new YarnConfiguration()); + YarnConfiguration conf = new YarnConfiguration(); + // enable timeline service v.2 + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + auxService.init(conf); auxService.start(); return auxService; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java index 7098814..b42488c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java @@ -30,8 +30,11 @@ @Test(timeout = 60000) public void testStartStopServer() throws Exception { + @SuppressWarnings("resource") TimelineReaderServer server = new TimelineReaderServer(); Configuration config = new YarnConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java index 45bce2f..91f6ee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java @@ -72,6 +72,8 @@ public static void tearDown() throws Exception { public void init() throws Exception { try { Configuration config = new YarnConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 3b285aa..818cd89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -238,6 +238,8 @@ public static void tearDown() throws Exception { public void init() throws Exception { try { Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");