diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index f52e654..214fcc0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -122,13 +122,6 @@ protected static final Map fileMap = Collections.synchronizedMap(new HashMap()); - - // For posting entities in new timeline service in a non-blocking way - // TODO YARN-3367 replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; @@ -137,6 +130,10 @@ private boolean newTimelineServiceEnabled = false; + // For posting entities in new timeline service in a non-blocking way + // TODO YARN-3367 replace with event loop in TimelineClient. + private ExecutorService threadPool; + private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT"; @@ -271,11 +268,15 @@ protected void serviceInit(Configuration conf) throws Exception { timelineClient = ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); + // TODO check the timeline service version on the server side and + // possibly replace this with that newTimelineServiceEnabled = conf.getBoolean( MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1")); LOG.info("Emitting job history data to the timeline server is enabled"); + // initialize the thread pool + threadPool = createThreadPool(); } else { LOG.info("Timeline service is not enabled"); } @@ -448,19 +449,27 @@ protected void serviceStop() throws Exception { if (timelineClient != null) { timelineClient.stop(); } - shutdownAndAwaitTermination(); + if (threadPool != null) { + shutdownAndAwaitTermination(); + } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } // TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index dafb6e9..efc5780 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1020,6 +1020,8 @@ public RunningAppContext(Configuration config, && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + // TODO check the timeline service version on the server side and + // possibly replace this with that boolean newTimelineServiceEnabled = conf.getBoolean( MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 18a4c14..edb825d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -173,7 +173,7 @@ public void serviceInit(Configuration conf) throws Exception { boolean enableTimelineAuxService = false; if (nmAuxServices != null) { for (String nmAuxService: nmAuxServices) { - if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) { + if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) { enableTimelineAuxService = true; break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index e27c947..cb4c2ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -221,10 +221,7 @@ // For posting entities in new timeline service in a non-blocking way // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); + private ExecutorService threadPool; // App Master configuration // No. of containers to run shell command on @@ -314,8 +311,10 @@ public static void main(String[] args) { } appMaster.run(); result = appMaster.finish(); - - shutdownAndAwaitTermination(); + + if (appMaster.threadPool != null) { + appMaster.shutdownAndAwaitTermination(); + } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -329,16 +328,22 @@ public static void main(String[] args) { System.exit(2); } } - + //TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { // Wait a while for existing tasks to terminate if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); @@ -550,12 +555,16 @@ public boolean init(String[] args) throws ParseException, IOException { if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { newTimelineService = false; } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) { + // TODO check the v.2 server configuration newTimelineService = true; } else { throw new IllegalArgumentException( "timeline_service_version is not set properly, should be 'v1' or 'v2'"); } } + if (newTimelineService) { + threadPool = createThreadPool(); + } } else { timelineClient = null; LOG.warn("Timeline service is not enabled"); @@ -623,8 +632,10 @@ public void run() throws YarnException, IOException, InterruptedException { nmClientAsync.start(); startTimelineClient(conf); - // need to bind timelineClient - amRMClient.registerTimelineClient(timelineClient); + if (newTimelineService) { + // need to bind timelineClient + amRMClient.registerTimelineClient(timelineClient); + } if(timelineClient != null) { if (newTimelineService) { publishApplicationAttemptEventOnNewTimelineService(timelineClient, @@ -982,11 +993,11 @@ public void onContainerStarted(ContainerId containerId, } if(applicationMaster.timelineClient != null) { if (applicationMaster.newTimelineService) { - ApplicationMaster.publishContainerStartEventOnNewTimelineService( + applicationMaster.publishContainerStartEventOnNewTimelineService( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); } else { - ApplicationMaster.publishContainerStartEvent( + applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); } @@ -1195,7 +1206,7 @@ private String readContent(String filePath) throws IOException { } } - private static void publishContainerStartEvent( + private void publishContainerStartEvent( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); @@ -1224,7 +1235,7 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishContainerEndEvent( + private void publishContainerEndEvent( final TimelineClient timelineClient, ContainerStatus container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); @@ -1246,7 +1257,7 @@ private static void publishContainerEndEvent( } } - private static void publishApplicationAttemptEvent( + private void publishApplicationAttemptEvent( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); @@ -1296,7 +1307,7 @@ Thread createLaunchContainerThread(Container allocatedContainer, return new Thread(runnableLaunchContainer); } - private static void publishContainerStartEventOnNewTimelineService( + private void publishContainerStartEventOnNewTimelineService( final TimelineClient timelineClient, final Container container, final String domainId, final UserGroupInformation ugi) { Runnable publishWrapper = new Runnable() { @@ -1308,7 +1319,7 @@ public void run() { threadPool.execute(publishWrapper); } - private static void publishContainerStartEventOnNewTimelineServiceBase( + private void publishContainerStartEventOnNewTimelineServiceBase( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = @@ -1341,7 +1352,7 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishContainerEndEventOnNewTimelineService( + private void publishContainerEndEventOnNewTimelineService( final TimelineClient timelineClient, final ContainerStatus container, final String domainId, final UserGroupInformation ugi) { Runnable publishWrapper = new Runnable() { @@ -1353,7 +1364,7 @@ public void run() { threadPool.execute(publishWrapper); } - private static void publishContainerEndEventOnNewTimelineServiceBase( + private void publishContainerEndEventOnNewTimelineServiceBase( final TimelineClient timelineClient, final ContainerStatus container, final String domainId, final UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = @@ -1385,7 +1396,7 @@ public TimelinePutResponse run() throws Exception { } } - private static void publishApplicationAttemptEventOnNewTimelineService( + private void publishApplicationAttemptEventOnNewTimelineService( final TimelineClient timelineClient, final String appAttemptId, final DSEvent appEvent, final String domainId, final UserGroupInformation ugi) { @@ -1399,7 +1410,7 @@ public void run() { threadPool.execute(publishWrapper); } - private static void publishApplicationAttemptEventOnNewTimelineServiceBase( + private void publishApplicationAttemptEventOnNewTimelineServiceBase( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index f2707ba..96ff2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -53,12 +53,18 @@ */ protected ApplicationId contextAppId; + /** + * Creates an instance of the timeline v.1.x client. + */ @Public public static TimelineClient createTimelineClient() { TimelineClient client = new TimelineClientImpl(); return client; } + /** + * Creates an instance of the timeline v.2 client. + */ @Public public static TimelineClient createTimelineClient(ApplicationId appId) { TimelineClient client = new TimelineClientImpl(appId); @@ -156,8 +162,9 @@ public abstract void cancelDelegationToken( /** *

* Send the information of a number of conceptual entities to the timeline - * aggregator. It is a blocking API. The method will not return until all the - * put entities have been persisted. + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. If this method is invoked + * for a non-v.2 timeline client instance, a YarnException is thrown. *

* * @param entities @@ -173,8 +180,9 @@ public abstract void putEntities( /** *

* Send the information of a number of conceptual entities to the timeline - * aggregator. It is an asynchronous API. The method will return once all the - * entities are received. + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. If this method is invoked for a + * non-v.2 timeline client instance, a YarnException is thrown. *

* * @param entities diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 8312b6d..2190d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -353,6 +353,9 @@ public void putEntitiesAsync( private void putEntities(boolean async, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { + if (!newTimelineService) { + throw new YarnException("v.2 method is invoked on a v.1.x client"); + } org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entitiesContainer = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 137b7c5..341b296 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -56,6 +56,17 @@ } /** + * Returns whether the timeline service v.2 is enabled via configuration. + */ + public static boolean timelineV2Enabled(Configuration conf) { + boolean timelineServiceEnabled = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + // TODO decide what to do for the v.2 config for this JIRA + return timelineServiceEnabled; + } + + /** * Serialize a POJO object into a JSON string not in a pretty format * * @param o diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index fa0cf5c..066abfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -261,10 +261,12 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) { private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; List list = p.getRegisteredCollectorsList(); - this.registeredCollectors = new HashMap (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.registeredCollectors = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 2521b9c..151006b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -531,10 +531,12 @@ private void initSystemCredentials() { private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getAppCollectorsMapList(); - this.appCollectorsMap = new HashMap (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.appCollectorsMap = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 0b378a1..8fce422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -64,7 +64,8 @@ /** * Get the registered collectors that located on this NM. - * @return registered + * @return registered collectors, or null if the timeline service v.2 is not + * enabled */ Map getRegisteredCollectors(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 601bd04..f447069 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -97,6 +98,7 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + // the NM collector service is set only if the timeline service v.2 is enabled private NMCollectorService nmCollectorService; private NodeStatusUpdater nodeStatusUpdater; private NodeResourceMonitor nodeResourceMonitor; @@ -356,8 +358,10 @@ protected void serviceInit(Configuration conf) throws Exception { DefaultMetricsSystem.initialize("NodeManager"); - this.nmCollectorService = createNMCollectorService(context); - addService(nmCollectorService); + if (TimelineUtils.timelineV2Enabled(conf)) { + this.nmCollectorService = createNMCollectorService(context); + addService(nmCollectorService); + } // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. @@ -457,8 +461,7 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); - protected Map registeredCollectors = - new ConcurrentHashMap(); + protected Map registeredCollectors; protected final ConcurrentMap increasedContainers = @@ -484,6 +487,9 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, NMStateStoreService stateStore, Configuration conf) { + if (TimelineUtils.timelineV2Enabled(conf)) { + this.registeredCollectors = new ConcurrentHashMap<>(); + } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -620,7 +626,9 @@ public void setSystemCrendentialsForApps( public void addRegisteredCollectors( Map newRegisteredCollectors) { - this.registeredCollectors.putAll(newRegisteredCollectors); + if (this.registeredCollectors != null) { + this.registeredCollectors.putAll(newRegisteredCollectors); + } } @Override @@ -709,7 +717,14 @@ public Context getNMContext() { return this.context; } - // For testing + /** + * Returns the NM collector service. It should be used only for testing + * purposes. + * + * @return the NM collector service, or null if the timeline service v.2 is + * not enabled + */ + @VisibleForTesting NMCollectorService getNMCollectorService() { return this.nmCollectorService; } @@ -717,6 +732,7 @@ NMCollectorService getNMCollectorService() { public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + @SuppressWarnings("resource") NodeManager nodeManager = new NodeManager(); Configuration conf = new YarnConfiguration(); new GenericOptionsParser(conf, args); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index d39204f..0847b9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -850,23 +850,6 @@ public void run() { } } - /** - * Caller should take care of sending non null nodelabels for both - * arguments - * - * @param nodeLabelsNew - * @param nodeLabelsOld - * @return if the New node labels are diff from the older one. - */ - private boolean areNodeLabelsUpdated(Set nodeLabelsNew, - Set nodeLabelsOld) { - if (nodeLabelsNew.size() != nodeLabelsOld.size() - || !nodeLabelsOld.containsAll(nodeLabelsNew)) { - return true; - } - return false; - } - private void updateTimelineClientsAddress( NodeHeartbeatResponse response) { Set> rmKnownCollectors = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index db79ee5..35bfa8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +/** + * Service that handles collector information. It is used only if the timeline + * service v.2 is enabled. + */ public class NMCollectorService extends CompositeService implements CollectorNodemanagerProtocol { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index b010eee..25129ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; @@ -190,7 +191,8 @@ private long waitForContainersOnShutdownMillis; - private final NMTimelinePublisher nmMetricsPublisher; + // NM metrics publisher is set only if the timeline service v.2 is enabled + private NMTimelinePublisher nmMetricsPublisher; public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -218,8 +220,11 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); - nmMetricsPublisher = createNMTimelinePublisher(context); - context.setNMTimelinePublisher(nmMetricsPublisher); + // initialize the metrics publisher if the timeline service v.2 is enabled + if (TimelineUtils.timelineV2Enabled(context.getConf())) { + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); + } this.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); @@ -237,7 +242,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, addService(dispatcher); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -335,7 +339,7 @@ private void recoverApplication(ContainerManagerApplicationProto p) LOG.info("Recovering application " + appId); //TODO: Recover flow and flow run ID ApplicationImpl app = new ApplicationImpl( - dispatcher, p.getUser(), null, null, 0, appId, creds, context); + dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -941,20 +945,29 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, try { if (!serviceStopped) { // Create the application - String flowName = launchContext.getEnvironment().get( - TimelineUtils.FLOW_NAME_TAG_PREFIX); - String flowVersion = launchContext.getEnvironment().get( - TimelineUtils.FLOW_VERSION_TAG_PREFIX); - String flowRunIdStr = launchContext.getEnvironment().get( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - long flowRunId = 0L; - if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.parseLong(flowRunIdStr); + // populate the flow context from the launch context if the timeline + // service v.2 is enabled + FlowContext flowContext; + if (TimelineUtils.timelineV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment().get( + TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment().get( + TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = + new FlowContext(flowName, flowVersion, flowRunId); + } else { + flowContext = null; } if (!context.getApplications().containsKey(applicationID)) { Application application = - new ApplicationImpl(dispatcher, user, flowName, flowVersion, - flowRunId, applicationID, credentials, context); + new ApplicationImpl(dispatcher, user, flowContext, + applicationID, credentials, context); if (context.getApplications().putIfAbsent(applicationID, application) == null) { LOG.info("Creating a new application reference for app " @@ -1310,7 +1323,9 @@ public void handle(ContainerEvent event) { Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); - nmMetricsPublisher.publishContainerEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishContainerEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1326,7 +1341,9 @@ public void handle(ApplicationEvent event) { event.getApplicationID()); if (app != null) { app.handle(event); - nmMetricsPublisher.publishApplicationEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishApplicationEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1349,7 +1366,9 @@ public void handle(ApplicationEvent event) { @Override public void handle(LocalizationEvent event) { origLocalizationEventHandler.handle(event); - timelinePublisher.publishLocalizationEvent(event); + if (timelinePublisher != null) { + timelinePublisher.publishLocalizationEvent(event); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 6e87cfd..ffed61a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -67,9 +68,8 @@ final Dispatcher dispatcher; final String user; - final String flowName; - final String flowVersion; - final long flowRunId; + // flow context is set only if the timeline service v.2 is enabled + private FlowContext flowContext; final ApplicationId appId; final Credentials credentials; Map applicationACLs; @@ -86,14 +86,16 @@ Map containers = new HashMap(); - public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, - String flowVersion, long flowRunId, ApplicationId appId, - Credentials credentials, Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context) { + this(dispatcher, user, null, appId, credentials, context); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + FlowContext flowContext, ApplicationId appId, Credentials credentials, + Context context) { this.dispatcher = dispatcher; this.user = user; - this.flowName = flowName; - this.flowVersion = flowVersion; - this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); @@ -103,11 +105,44 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); Configuration conf = context.getConf(); - if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - createAndStartTimelineClient(conf); + if (TimelineUtils.timelineV2Enabled(conf)) { + if (flowContext == null) { + throw new IllegalArgumentException("flow context cannot be null"); + } + this.flowContext = flowContext; + if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + createAndStartTimelineClient(conf); + } } } - + + /** + * Data object that encapsulates the flow context for the application purpose. + */ + public static class FlowContext { + private final String flowName; + private final String flowVersion; + private final long flowRunId; + + public FlowContext(String flowName, String flowVersion, long flowRunId) { + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; + } + + public long getFlowRunId() { + return flowRunId; + } + } + private void createAndStartTimelineClient(Configuration conf) { // create and start timeline client this.timelineClient = TimelineClient.createTimelineClient(appId); @@ -454,7 +489,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { // Remove collectors info for finished apps. // TODO check we remove related collectors info in failure cases // (YARN-3038) - app.context.getRegisteredCollectors().remove(app.getAppId()); + Map registeredCollectors = + app.context.getRegisteredCollectors(); + if (registeredCollectors != null) { + registeredCollectors.remove(app.getAppId()); + } // stop timelineClient when application get finished. TimelineClient timelineClient = app.getTimelineClient(); if (timelineClient != null) { @@ -521,16 +560,16 @@ public LogAggregationContext getLogAggregationContext() { @Override public String getFlowName() { - return flowName; + return flowContext == null ? null : flowContext.getFlowName(); } @Override public String getFlowVersion() { - return flowVersion; + return flowContext == null ? null : flowContext.getFlowVersion(); } @Override public long getFlowRunId() { - return flowRunId; + return flowContext == null ? 0L : flowContext.getFlowRunId(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index dfa32ac..589cf75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -559,9 +560,13 @@ public void run() { ContainerImpl container = (ContainerImpl) context.getContainers().get(containerId); - container.getNMTimelinePublisher().reportContainerResourceUsage( - container, currentTime, pId, currentPmemUsage, - cpuUsageTotalCoresPercentage); + NMTimelinePublisher nmMetricsPublisher = + container.getNMTimelinePublisher(); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.reportContainerResourceUsage( + container, currentTime, pId, currentPmemUsage, + cpuUsageTotalCoresPercentage); + } } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 2c5c300..e077f08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -56,6 +56,10 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +/** + * Metrics publisher service that publishes data to the timeline service v.2. It + * is not used if the timeline service v.2 is not enabled. + */ public class NMTimelinePublisher extends CompositeService { private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index f29b791..c43777c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -50,7 +50,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -95,7 +94,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -106,7 +104,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; public class TestContainerManagerRecovery extends BaseContainerManagerTest { @@ -473,7 +470,7 @@ private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore, conf){ + new ApplicationACLsManager(conf), stateStore, conf) { public int getHttpPort() { return HTTP_PORT; } @@ -638,9 +635,9 @@ public void setBlockNewContainerRequests( } @Override - public NMTimelinePublisher createNMTimelinePublisher(Context context) { - NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class); - return timelinePublisher; + public NMTimelinePublisher + createNMTimelinePublisher(Context context) { + return null; } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 38b3172f..f31a98c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -535,7 +534,7 @@ public boolean matches(Object argument) { this.appId = BuilderUtils.newApplicationId(timestamp, id); app = new ApplicationImpl( - dispatcher, this.user, null, null, 0, appId, null, context); + dispatcher, this.user, appId, null, context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index ef5eb65..a6818ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -31,17 +31,14 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.NodeHealthScriptRunner; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -50,7 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp; @@ -64,6 +60,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.w3c.dom.Document; @@ -327,7 +324,7 @@ public void testContainerLogs() throws IOException { final String filename = "logfile1"; final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", - null, null, 0, appId, null, nmContext)); + appId, null, nmContext)); MockContainer container = new MockContainer(appAttemptId, new AsyncDispatcher(), new Configuration(), "user", appId, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index dd817d0..09f1c77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -293,8 +294,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + // Remove collector address when app get finished. - rmApp.removeCollectorAddr(); + if (TimelineUtils.timelineV2Enabled(getConfig())) { + rmApp.removeCollectorAddr(); + } // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. @@ -562,8 +566,10 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); // add collector address for this application - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + if (TimelineUtils.timelineV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + } // add preemption to the allocateResponse message (if any) allocateResponse diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 213c226..998eeec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -571,24 +571,27 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } - // Sanity check for flow run - String value = null; - try { - for (String tag : submissionContext.getApplicationTags()) { - if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || - tag.startsWith( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { - value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1); - Long.valueOf(value); + if (TimelineUtils.timelineV2Enabled(getConfig())) { + // Sanity check for flow run + String value = null; + try { + for (String tag : submissionContext.getApplicationTags()) { + if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || + tag.startsWith( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { + value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + + 1); + Long.valueOf(value); + } } + } catch (NumberFormatException e) { + LOG.warn("Invalid to flow run: " + value + + ". Flow run should be a long integer", e); + RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, + e.getMessage(), "ClientRMService", + "Exception in submitting application", applicationId); + throw RPCUtil.getRemoteException(e); } - } catch (NumberFormatException e) { - LOG.warn("Invalid to flow run: " + value + - ". Flow run should be a long integer", e); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - e.getMessage(), "ClientRMService", - "Exception in submitting application", applicationId); - throw RPCUtil.getRemoteException(e); } // Check whether app has already been put into rmContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 261526e..a4451a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -364,8 +365,11 @@ private RMAppImpl createAndPopulateNewRMApp( LOG.warn(message); throw new YarnException(message); } - // Start timeline collector for the submitted app - application.startTimelineCollector(); + + if (TimelineUtils.timelineV2Enabled(conf)) { + // Start timeline collector for the submitted app + application.startTimelineCollector(); + } // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a5de053..3ab0195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; @@ -380,6 +381,7 @@ protected SystemMetricsPublisher createSystemMetricsPublisher() { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); SystemMetricsPublisher publisher = null; if (timelineServiceEnabled) { + // TODO we need a more explicit way to switch between v.1 and v.2... if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { LOG.info("TimelineService V1 is configured"); @@ -515,10 +517,12 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - RMTimelineCollectorManager timelineCollectorManager = - createRMTimelineCollectorManager(); - addService(timelineCollectorManager); - rmContext.setRMTimelineCollectorManager(timelineCollectorManager); + if (TimelineUtils.timelineV2Enabled(configuration)) { + RMTimelineCollectorManager timelineCollectorManager = + createRMTimelineCollectorManager(); + addService(timelineCollectorManager); + rmContext.setRMTimelineCollectorManager(timelineCollectorManager); + } // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e7eee6b..738c19e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -445,10 +446,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } - // Check & update collectors info from request. - // TODO make sure it won't have race condition issue for AM failed over case - // that the older registration could possible override the newer one. - updateAppCollectorsMap(request); + boolean timelineV2Enabled = TimelineUtils.timelineV2Enabled(getConfig()); + if (timelineV2Enabled) { + // Check & update collectors info from request. + // TODO make sure it won't have race condition issue for AM failed over + // case that the older registration could possible override the newer + // one. + updateAppCollectorsMap(request); + } // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils @@ -467,12 +472,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } - // Return collectors' map that NM needs to know - // TODO we should optimize this to only include collector info that NM - // doesn't know yet. List keepAliveApps = remoteNodeStatus.getKeepAliveApplications(); - if (keepAliveApps != null) { + if (timelineV2Enabled && keepAliveApps != null) { + // Return collectors' map that NM needs to know + // TODO we should optimize this to only include collector info that NM + // doesn't know yet. setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index df760a3..d9d76a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -215,12 +215,14 @@ protected void setupTokens( .get(applicationId) .getSubmitTime())); - // Set flow context info - for (String tag : - rmContext.getRMApps().get(applicationId).getApplicationTags()) { - setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); + if (TimelineUtils.timelineV2Enabled(conf)) { + // Set flow context info + for (String tag : + rmContext.getRMApps().get(applicationId).getApplicationTags()) { + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); + } } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 05f682a..69671f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -913,15 +914,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Updating collector info for app: " + app.getApplicationId()); + if (TimelineUtils.timelineV2Enabled(app.conf)) { + LOG.info("Updating collector info for app: " + app.getApplicationId()); - RMAppCollectorUpdateEvent appCollectorUpdateEvent = - (RMAppCollectorUpdateEvent) event; - // Update collector address - app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); + RMAppCollectorUpdateEvent appCollectorUpdateEvent = + (RMAppCollectorUpdateEvent) event; + // Update collector address + app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); - // TODO persistent to RMStateStore for recover - // Save to RMStateStore + // TODO persistent to RMStateStore for recover + // Save to RMStateStore + } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 8215468..bdf7100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -641,6 +641,7 @@ public void handle(Event event) {} ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // without name and queue @@ -732,6 +733,7 @@ public void handle(Event event) {} ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // Initialize appnames and queues String[] queues = {QUEUE_1, QUEUE_2}; @@ -895,6 +897,7 @@ public void handle(Event rawEvent) { final ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, null, null, null); + rmService.init(new Configuration()); // submit an app and wait for it to block while in app submission Thread t = new Thread() {