diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java index 2ec4833..ef8481e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; @@ -82,7 +83,7 @@ protected void writeEntities(Configuration tlConf, // create the app level timeline collector and start it AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); - manager.putIfAbsent(appId, collector); + manager.putIfAbsent(collector); try { // parse the job info and configuration JobInfo jobInfo = @@ -134,7 +135,7 @@ protected void writeEntities(Configuration tlConf, context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). increment(numEntities); } finally { - manager.remove(appId); + manager.remove(CollectorData.toCollectorId(appId)); context.progress(); // move it along } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java index d66deb0..60210c7 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; @@ -55,7 +56,7 @@ protected void writeEntities(Configuration tlConf, // create the app level timeline collector AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); - manager.putIfAbsent(appId, collector); + manager.putIfAbsent(collector); try { // set the context @@ -125,7 +126,7 @@ protected void writeEntities(Configuration tlConf, increment(kbs*testtimes); } finally { // clean up - manager.remove(appId); + manager.remove(CollectorData.toCollectorId(appId)); } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java index 604a40b..3601779 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java @@ -18,20 +18,24 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Records; public abstract class GetTimelineCollectorContextRequest { public static GetTimelineCollectorContextRequest newInstance( - ApplicationId appId) { + String id, String type) { GetTimelineCollectorContextRequest request = Records.newRecord(GetTimelineCollectorContextRequest.class); - request.setApplicationId(appId); + request.setId(id); + request.setType(type); return request; } - public abstract ApplicationId getApplicationId(); + public abstract String getId(); - public abstract void setApplicationId(ApplicationId appId); + public abstract void setId(String id); + + public abstract String getType(); + + public abstract void setType(String type); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index f238f79..dc03b5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -48,7 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, - Map registeringCollectors) { + Map registeringCollectors) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -80,9 +80,9 @@ public abstract void setLogAggregationReportsForApps( List logAggregationReportsForApps); // This tells RM registered collectors' address info on this node - public abstract Map + public abstract Map getRegisteringCollectors(); - public abstract void setRegisteringCollectors(Map appCollectorsMap); + public abstract void setRegisteringCollectors( + Map collectorsMap); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index fa056d0..daf5146 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -43,9 +43,9 @@ List getApplicationsToCleanup(); // This tells NM the collectors' address info of related apps - Map getAppCollectorsMap(); - void setAppCollectorsMap( - Map appCollectorsMap); + Map getCollectorsMap(); + void setCollectorsMap( + Map appCollectorsMap); void setResponseId(int responseId); void setNodeAction(NodeAction action); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 1503eca..aa5f022 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -21,33 +21,33 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.util.Records; @Private public abstract class ReportNewCollectorInfoRequest { public static ReportNewCollectorInfoRequest newInstance( - List appCollectorsList) { + List appCollectorsList) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); - request.setAppCollectorsList(appCollectorsList); + request.setCollectorsList(appCollectorsList); return request; } public static ReportNewCollectorInfoRequest newInstance( - ApplicationId id, String collectorAddr) { + String id, String collectorAddr, String type) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); - request.setAppCollectorsList( - Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); + request.setCollectorsList( + Arrays.asList(CollectorData.newInstance( + id, collectorAddr, type))); return request; } - public abstract List getAppCollectorsList(); + public abstract List getCollectorsList(); - public abstract void setAppCollectorsList( - List appCollectorsList); + public abstract void setCollectorsList( + List collectorsList); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java index 7014388..e49cca4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java @@ -34,7 +34,8 @@ private GetTimelineCollectorContextRequestProto.Builder builder = null; private boolean viaProto = false; - private ApplicationId appId = null; + private String id = null; + private String type = null; public GetTimelineCollectorContextRequestPBImpl() { builder = GetTimelineCollectorContextRequestProto.newBuilder(); @@ -75,8 +76,8 @@ public String toString() { } private void mergeLocalToBuilder() { - if (appId != null) { - builder.setAppId(convertToProtoFormat(this.appId)); + if (id != null) { + builder.setId(this.id); } } @@ -97,36 +98,53 @@ private void maybeInitBuilder() { } @Override - public ApplicationId getApplicationId() { - if (this.appId != null) { - return this.appId; + public String getId() { + if (this.id != null) { + return this.id; } GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasAppId()) { + if (!p.hasId()) { return null; } - this.appId = convertFromProtoFormat(p.getAppId()); - return this.appId; + this.id = p.getId(); + return this.id; } @Override - public void setApplicationId(ApplicationId id) { + public void setId(String id) { maybeInitBuilder(); if (id == null) { - builder.clearAppId(); + builder.clearId(); } - this.appId = id; + this.id = id; } - private ApplicationIdPBImpl convertFromProtoFormat( - YarnProtos.ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); + @Override + public String getType() { + if (this.type != null) { + return this.type; + } + + GetTimelineCollectorContextRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasType()) { + return null; + } + + this.type = p.getType(); + return this.type; } - private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl)t).getProto(); + @Override + public void setType(String type) { + maybeInitBuilder(); + if (type == null) { + builder.clearType(); + } + this.type = type; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index b712a28..2d09179 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -26,23 +26,20 @@ import java.util.Map; import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.CollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -59,7 +56,7 @@ private Set labels = null; private List logAggregationReportsForApps = null; - private Map registeredCollectors = null; + private Map registeringCollectors = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -115,8 +112,8 @@ private void mergeLocalToBuilder() { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } - if (this.registeredCollectors != null) { - addRegisteredCollectorsToProto(); + if (this.registeringCollectors != null) { + addRegisteringCollectorsToProto(); } } @@ -159,14 +156,18 @@ private LogAggregationReportProto convertToProtoFormat( return ((LogAggregationReportPBImpl) value).getProto(); } - private void addRegisteredCollectorsToProto() { + private void addRegisteringCollectorsToProto() { maybeInitBuilder(); - builder.clearRegisteredCollectors(); - for (Map.Entry entry : - registeredCollectors.entrySet()) { - builder.addRegisteredCollectors(AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue().getCollectorAddr())); + builder.clearRegisteringCollectors(); + for (Map.Entry entry : + registeringCollectors.entrySet()) { + CollectorData data = entry.getValue(); + builder.addRegisteringCollectors(CollectorDataProto.newBuilder() + .setId(entry.getKey()) + .setCollectorAddr(data.getCollectorAddr()) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion()) + .setType(data.getType())); } } @@ -252,37 +253,38 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) { } @Override - public Map getRegisteringCollectors() { - if (this.registeredCollectors != null) { - return this.registeredCollectors; + public Map getRegisteringCollectors() { + if (this.registeringCollectors != null) { + return this.registeringCollectors; } initRegisteredCollectors(); - return registeredCollectors; + return registeringCollectors; } private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getRegisteredCollectorsList(); + List list = p.getRegisteringCollectorsList(); if (!list.isEmpty()) { - this.registeredCollectors = new HashMap<>(); - for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); - this.registeredCollectors.put(appId, data); + this.registeringCollectors = new HashMap<>(); + for (CollectorDataProto c : list) { + String id = c.getId(); + CollectorData data = CollectorData.newInstance(c.getId(), + c.getCollectorAddr(), c.getType(), c.getRmIdentifier(), + c.getVersion()); + this.registeringCollectors.put(id, data); } } } @Override public void setRegisteringCollectors( - Map registeredCollectors) { - if (registeredCollectors == null || registeredCollectors.isEmpty()) { + Map registeringCollectors) { + if (registeringCollectors == null || registeringCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.registeredCollectors = new HashMap<>(); - this.registeredCollectors.putAll(registeredCollectors); + this.registeringCollectors = new HashMap<>(); + this.registeringCollectors.putAll(registeringCollectors); } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { @@ -293,14 +295,6 @@ private NodeStatusProto convertToProtoFormat(NodeStatus t) { return ((NodeStatusPBImpl)t).getProto(); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { return new MasterKeyPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 6006b5b..282c292 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,12 +46,12 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.CollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,7 +70,7 @@ private List applicationsToCleanup = null; private Map systemCredentials = null; private Resource resource = null; - private Map appCollectorsMap = null; + private Map collectorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -128,7 +128,7 @@ private void mergeLocalToBuilder() { if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } - if (this.appCollectorsMap != null) { + if (this.collectorsMap != null) { addAppCollectorsMapToProto(); } } @@ -146,13 +146,14 @@ private void addSystemCredentialsToProto() { private void addAppCollectorsMapToProto() { maybeInitBuilder(); - builder.clearAppCollectorsMap(); - for (Map.Entry entry - : appCollectorsMap.entrySet()) { - AppCollectorData data = entry.getValue(); - builder.addAppCollectorsMap(AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(data.getCollectorAddr()) + builder.clearCollectorsMap(); + for (Map.Entry entry + : collectorsMap.entrySet()) { + CollectorData data = entry.getValue(); + builder.addCollectorsMap(CollectorDataProto.newBuilder() + .setId(entry.getKey()) + .setCollectorAddr(data.getCollectorAddr()) + .setType(data.getType()) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion())); } @@ -572,12 +573,12 @@ public void remove() { } @Override - public Map getAppCollectorsMap() { - if (this.appCollectorsMap != null) { - return this.appCollectorsMap; + public Map getCollectorsMap() { + if (this.collectorsMap != null) { + return this.collectorsMap; } - initAppCollectorsMap(); - return appCollectorsMap; + initCollectorsMap(); + return collectorsMap; } private void initSystemCredentials() { @@ -591,16 +592,17 @@ private void initSystemCredentials() { } } - private void initAppCollectorsMap() { + private void initCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getAppCollectorsMapList(); + List list = p.getCollectorsMapList(); if (!list.isEmpty()) { - this.appCollectorsMap = new HashMap<>(); - for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); - this.appCollectorsMap.put(appId, data); + this.collectorsMap = new HashMap<>(); + for (CollectorDataProto c : list) { + String id = c.getId(); + CollectorData data = CollectorData.newInstance(id, + c.getCollectorAddr(), c.getType(), c.getRmIdentifier(), + c.getVersion()); + this.collectorsMap.put(id, data); } } } @@ -617,14 +619,14 @@ public void setSystemCredentialsForApps( } @Override - public void setAppCollectorsMap( - Map appCollectorsMap) { - if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { + public void setCollectorsMap( + Map collectorsMap) { + if (collectorsMap == null || collectorsMap.isEmpty()) { return; } maybeInitBuilder(); - this.appCollectorsMap = new HashMap<>(); - this.appCollectorsMap.putAll(appCollectorsMap); + this.collectorsMap = new HashMap<>(); + this.collectorsMap.putAll(collectorsMap); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index 3f3dcf5..d934218 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.CollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; +import org.apache.hadoop.yarn.server.api.records.CollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.CollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { @@ -36,7 +36,7 @@ private ReportNewCollectorInfoRequestProto.Builder builder = null; private boolean viaProto = false; - private List collectorsList = null; + private List collectorsList = null; public ReportNewCollectorInfoRequestPBImpl() { builder = ReportNewCollectorInfoRequestProto.newBuilder(); @@ -95,27 +95,27 @@ private void maybeInitBuilder() { private void addLocalCollectorsToProto() { maybeInitBuilder(); - builder.clearAppCollectors(); - List protoList = - new ArrayList(); - for (AppCollectorData m : this.collectorsList) { + builder.clearCollectors(); + List protoList = + new ArrayList(); + for (CollectorData m : this.collectorsList) { protoList.add(convertToProtoFormat(m)); } - builder.addAllAppCollectors(protoList); + builder.addAllCollectors(protoList); } private void initLocalCollectorsList() { ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = - p.getAppCollectorsList(); - this.collectorsList = new ArrayList(); - for (AppCollectorDataProto m : list) { + List list = + p.getCollectorsList(); + this.collectorsList = new ArrayList(); + for (CollectorDataProto m : list) { this.collectorsList.add(convertFromProtoFormat(m)); } } @Override - public List getAppCollectorsList() { + public List getCollectorsList() { if (this.collectorsList == null) { initLocalCollectorsList(); } @@ -123,22 +123,22 @@ private void initLocalCollectorsList() { } @Override - public void setAppCollectorsList(List appCollectorsList) { + public void setCollectorsList(List appCollectorsList) { maybeInitBuilder(); if (appCollectorsList == null) { - builder.clearAppCollectors(); + builder.clearCollectors(); } this.collectorsList = appCollectorsList; } - private AppCollectorDataPBImpl convertFromProtoFormat( - AppCollectorDataProto p) { - return new AppCollectorDataPBImpl(p); + private CollectorDataPBImpl convertFromProtoFormat( + CollectorDataProto p) { + return new CollectorDataPBImpl(p); } - private AppCollectorDataProto convertToProtoFormat( - AppCollectorData m) { - return ((AppCollectorDataPBImpl) m).getProto(); + private CollectorDataProto convertToProtoFormat( + CollectorData m) { + return ((CollectorDataPBImpl) m).getProto(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java deleted file mode 100644 index 3714817..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.util.Records; - - -@Private -@InterfaceStability.Unstable -public abstract class AppCollectorData { - - public static final long UNSTAMPED_VERSION_NUMBER = -1; - - public static AppCollectorData newInstance( - ApplicationId id, String collectorAddr, long rmIdentifier, long version) { - AppCollectorData appCollectorData = - Records.newRecord(AppCollectorData.class); - appCollectorData.setApplicationId(id); - appCollectorData.setCollectorAddr(collectorAddr); - appCollectorData.setRMIdentifier(rmIdentifier); - appCollectorData.setVersion(version); - return appCollectorData; - } - - public static AppCollectorData newInstance(ApplicationId id, - String collectorAddr) { - return newInstance(id, collectorAddr, UNSTAMPED_VERSION_NUMBER, - UNSTAMPED_VERSION_NUMBER); - } - - /** - * Returns if a collector data item happens before another one. Null data - * items happens before any other non-null items. Non-null data items A - * happens before another non-null item B when A's rmIdentifier is less than - * B's rmIdentifier. Or A's version is less than B's if they have the same - * rmIdentifier. - * - * @param dataA first collector data item. - * @param dataB second collector data item. - * @return true if dataA happens before dataB. - */ - public static boolean happensBefore(AppCollectorData dataA, - AppCollectorData dataB) { - if (dataA == null && dataB == null) { - return false; - } else if (dataA == null || dataB == null) { - return dataA == null; - } - - return - (dataA.getRMIdentifier() < dataB.getRMIdentifier()) - || ((dataA.getRMIdentifier() == dataB.getRMIdentifier()) - && (dataA.getVersion() < dataB.getVersion())); - } - - /** - * Returns if the collector data has been stamped with a RM time stamp. - * - * @return true if RM has already assigned a time stamp for this collector. - * Otherwise, it means the RM has not recognized the existence of this - * collector. - */ - public boolean stamped() { - return (getRMIdentifier() != UNSTAMPED_VERSION_NUMBER) - || (getVersion() != UNSTAMPED_VERSION_NUMBER); - } - - public abstract ApplicationId getApplicationId(); - - public abstract void setApplicationId(ApplicationId id); - - public abstract String getCollectorAddr(); - - public abstract void setCollectorAddr(String addr); - - public abstract long getRMIdentifier(); - - public abstract void setRMIdentifier(long rmId); - - public abstract long getVersion(); - - public abstract void setVersion(long version); - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/CollectorData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/CollectorData.java new file mode 100644 index 0000000..8a008d6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/CollectorData.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; + + +@Private +@InterfaceStability.Unstable +public abstract class CollectorData { + + public static final long UNSTAMPED_VERSION_NUMBER = -1; + + public enum Type { + BASE, APP_LEVEL + } + + public static CollectorData newInstance(String id, String collectorAddr, + String type, long rmIdentifier, long version) { + CollectorData collectorData = + Records.newRecord(CollectorData.class); + collectorData.setId(id); + collectorData.setCollectorAddr(collectorAddr); + collectorData.setRMIdentifier(rmIdentifier); + collectorData.setVersion(version); + collectorData.setType(type); + return collectorData; + } + + public static CollectorData newInstance(String id, String collectorAddr, + String type) { + return newInstance(id, collectorAddr, type, UNSTAMPED_VERSION_NUMBER, + UNSTAMPED_VERSION_NUMBER); + } + + /** + * Returns if a collector data item happens before another one. Null data + * items happens before any other non-null items. Non-null data items A + * happens before another non-null item B when A's rmIdentifier is less than + * B's rmIdentifier. Or A's version is less than B's if they have the same + * rmIdentifier. + * + * @param dataA first collector data item. + * @param dataB second collector data item. + * @return true if dataA happens before dataB. + */ + public static boolean happensBefore(CollectorData dataA, + CollectorData dataB) { + if (dataA == null && dataB == null) { + return false; + } else if (dataA == null || dataB == null) { + return dataA == null; + } + + return + (dataA.getRMIdentifier() < dataB.getRMIdentifier()) + || ((dataA.getRMIdentifier() == dataB.getRMIdentifier()) + && (dataA.getVersion() < dataB.getVersion())); + } + + /** + * Returns if the collector data has been stamped with a RM time stamp. + * + * @return true if RM has already assigned a time stamp for this collector. + * Otherwise, it means the RM has not recognized the existence of this + * collector. + */ + public boolean stamped() { + return (getRMIdentifier() != UNSTAMPED_VERSION_NUMBER) + || (getVersion() != UNSTAMPED_VERSION_NUMBER); + } + + /** + * Build a collector id from an app id. + * @param appId Provided application id. + * @return a string representing the collector id. + */ + public static String toCollectorId(ApplicationId appId) { + return appId.toString(); + } + + public ApplicationId appIdFromCollectorId() throws YarnException { + if (Type.APP_LEVEL.name().equals(this.getType())) { + return ApplicationId.fromString(this.getId()); + } else { + throw new UnsupportedOperationException( + "Cannot convert collector id to application id for collector " + + this.getId() + " with type " + this.getType()); + } + } + + + public abstract String getId(); + + public abstract void setId(String id); + + public abstract String getType(); + + public abstract void setType(String type); + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + public abstract long getRMIdentifier(); + + public abstract void setRMIdentifier(long rmId); + + public abstract long getVersion(); + + public abstract void setVersion(long version); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java deleted file mode 100644 index 8823f59..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java +++ /dev/null @@ -1,200 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.yarn.server.api.records.impl.pb; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; - -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; - -import com.google.protobuf.TextFormat; - -@Private -@Unstable -public class AppCollectorDataPBImpl extends AppCollectorData { - - private AppCollectorDataProto proto = - AppCollectorDataProto.getDefaultInstance(); - - private AppCollectorDataProto.Builder builder = null; - private boolean viaProto = false; - - private ApplicationId appId = null; - private String collectorAddr = null; - private Long rmIdentifier = null; - private Long version = null; - - public AppCollectorDataPBImpl() { - builder = AppCollectorDataProto.newBuilder(); - } - - public AppCollectorDataPBImpl(AppCollectorDataProto proto) { - this.proto = proto; - viaProto = true; - } - - public AppCollectorDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } - - @Override - public ApplicationId getApplicationId() { - AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; - if (this.appId == null && p.hasAppId()) { - this.appId = convertFromProtoFormat(p.getAppId()); - } - return this.appId; - } - - @Override - public String getCollectorAddr() { - AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; - if (this.collectorAddr == null - && p.hasAppCollectorAddr()) { - this.collectorAddr = p.getAppCollectorAddr(); - } - return this.collectorAddr; - } - - @Override - public void setApplicationId(ApplicationId id) { - maybeInitBuilder(); - if (id == null) { - builder.clearAppId(); - } - this.appId = id; - } - - @Override - public void setCollectorAddr(String collectorAddr) { - maybeInitBuilder(); - if (collectorAddr == null) { - builder.clearAppCollectorAddr(); - } - this.collectorAddr = collectorAddr; - } - - @Override - public long getRMIdentifier() { - AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; - if (this.rmIdentifier == null && p.hasRmIdentifier()) { - this.rmIdentifier = p.getRmIdentifier(); - } - if (this.rmIdentifier != null) { - return this.rmIdentifier; - } else { - return AppCollectorData.UNSTAMPED_VERSION_NUMBER; - } - } - - @Override - public void setRMIdentifier(long rmId) { - maybeInitBuilder(); - this.rmIdentifier = rmId; - builder.setRmIdentifier(rmId); - } - - @Override - public long getVersion() { - AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; - if (this.version == null && p.hasRmIdentifier()) { - this.version = p.getRmIdentifier(); - } - if (this.version != null) { - return this.version; - } else { - return AppCollectorData.UNSTAMPED_VERSION_NUMBER; - } - } - - @Override - public void setVersion(long version) { - maybeInitBuilder(); - this.version = version; - builder.setVersion(version); - } - - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = AppCollectorDataProto.newBuilder(proto); - } - viaProto = false; - } - - private void mergeLocalToProto() { - if (viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void mergeLocalToBuilder() { - if (this.appId != null) { - builder.setAppId(convertToProtoFormat(this.appId)); - } - if (this.collectorAddr != null) { - builder.setAppCollectorAddr(this.collectorAddr); - } - if (this.rmIdentifier != null) { - builder.setRmIdentifier(this.rmIdentifier); - } - if (this.version != null) { - builder.setVersion(this.version); - } - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/CollectorDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/CollectorDataPBImpl.java new file mode 100644 index 0000000..9ecd553 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/CollectorDataPBImpl.java @@ -0,0 +1,212 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.records.CollectorData; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.CollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.CollectorDataProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class CollectorDataPBImpl extends CollectorData { + + private CollectorDataProto proto = + CollectorDataProto.getDefaultInstance(); + + private CollectorDataProto.Builder builder = null; + private boolean viaProto = false; + + private String id = null; + private String collectorAddr = null; + private String type = null; + private Long rmIdentifier = null; + private Long version = null; + + public CollectorDataPBImpl() { + builder = CollectorDataProto.newBuilder(); + } + + public CollectorDataPBImpl(CollectorDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public CollectorDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public String getId() { + CollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.id == null && p.hasId()) { + this.id = p.getId(); + } + return this.id; + } + + @Override + public String getCollectorAddr() { + CollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null + && p.hasCollectorAddr()) { + this.collectorAddr = p.getCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setId(String id) { + maybeInitBuilder(); + if (id == null) { + builder.clearId(); + } + this.id = id; + } + + @Override + public String getType() { + CollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.type == null && p.hasType()) { + this.type = p.getType(); + } + return this.type; + } + + @Override + public void setType(String type) { + maybeInitBuilder(); + if (type == null) { + builder.clearType(); + } + this.type = type; + + } + + @Override + public void setCollectorAddr(String collectorAddr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearCollectorAddr(); + } + this.collectorAddr = collectorAddr; + } + + @Override + public long getRMIdentifier() { + CollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.rmIdentifier == null && p.hasRmIdentifier()) { + this.rmIdentifier = p.getRmIdentifier(); + } + if (this.rmIdentifier != null) { + return this.rmIdentifier; + } else { + return CollectorData.UNSTAMPED_VERSION_NUMBER; + } + } + + @Override + public void setRMIdentifier(long rmId) { + maybeInitBuilder(); + this.rmIdentifier = rmId; + builder.setRmIdentifier(rmId); + } + + @Override + public long getVersion() { + CollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.version == null && p.hasRmIdentifier()) { + this.version = p.getRmIdentifier(); + } + if (this.version != null) { + return this.version; + } else { + return CollectorData.UNSTAMPED_VERSION_NUMBER; + } + } + + @Override + public void setVersion(long version) { + maybeInitBuilder(); + this.version = version; + builder.setVersion(version); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CollectorDataProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.id != null) { + builder.setId(this.id); + } + if (this.collectorAddr != null) { + builder.setCollectorAddr(this.collectorAddr); + } + if (this.rmIdentifier != null) { + builder.setRmIdentifier(this.rmIdentifier); + } + if (this.version != null) { + builder.setVersion(this.version); + } + if (this.type != null) { + builder.setType(this.type); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index e9c6a3b..3fd6c84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -84,7 +84,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; - repeated AppCollectorDataProto registered_collectors = 6; + repeated CollectorDataProto registering_collectors = 6; } message LogAggregationReportProto { @@ -109,7 +109,7 @@ message NodeHeartbeatResponseProto { repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; - repeated AppCollectorDataProto app_collectors_map = 16; + repeated CollectorDataProto collectors_map = 16; } message ContainerQueuingLimitProto { @@ -125,25 +125,27 @@ message SystemCredentialsForAppsProto { //////////////////////////////////////////////////////////////////////// ////// From collector_nodemanager_protocol //////////////////////////// //////////////////////////////////////////////////////////////////////// -message AppCollectorDataProto { - optional ApplicationIdProto app_id = 1; - optional string app_collector_addr = 2; +message CollectorDataProto { + optional string id = 1; + optional string collector_addr = 2; optional int64 rm_identifier = 3 [default = -1]; optional int64 version = 4 [default = -1]; + optional string type = 5; } ////////////////////////////////////////////////////// /////// collector_nodemanager_protocol ////////////// ////////////////////////////////////////////////////// message ReportNewCollectorInfoRequestProto { - repeated AppCollectorDataProto app_collectors = 1; + repeated CollectorDataProto collectors = 1; } message ReportNewCollectorInfoResponseProto { } message GetTimelineCollectorContextRequestProto { - optional ApplicationIdProto appId = 1; + optional string id = 1; + optional string type = 2; } message GetTimelineCollectorContextResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 3be8dfd..4a141c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -87,6 +87,9 @@ public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0"; + public static final String DEFAULT_COLLECTOR_TYPE + = CollectorData.Type.APP_LEVEL.name(); + public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, 0); @@ -165,8 +168,8 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException { // normally response. try { ReportNewCollectorInfoRequest request = - ReportNewCollectorInfoRequest.newInstance( - DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR); + ReportNewCollectorInfoRequest.newInstance(DEFAULT_APP_ID.toString(), + DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TYPE); proxy.reportNewCollectorInfo(request); } catch (YarnException e) { Assert.fail("RPC call failured is not expected here."); @@ -186,7 +189,8 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException { try { GetTimelineCollectorContextRequest request = GetTimelineCollectorContextRequest.newInstance( - ApplicationId.newInstance(0, 1)); + CollectorData.toCollectorId(ApplicationId.newInstance(0, 1)), + CollectorData.Type.APP_LEVEL.name()); GetTimelineCollectorContextResponse response = proxy.getTimelineCollectorContext(request); Assert.assertEquals("test_user_id", response.getUserId()); @@ -201,7 +205,8 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException { try { GetTimelineCollectorContextRequest request = GetTimelineCollectorContextRequest.newInstance( - ApplicationId.newInstance(0, 2)); + CollectorData.toCollectorId(ApplicationId.newInstance(0, 2)), + CollectorData.Type.APP_LEVEL.name()); proxy.getTimelineCollectorContext(request); Assert.fail("RPC call failured is expected here."); } catch (YarnException | IOException e) { @@ -390,14 +395,15 @@ public static Token newContainerToken(NodeId nodeId, byte[] password, public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List appCollectors = request.getAppCollectorsList(); + List appCollectors = request.getCollectorsList(); if (appCollectors.size() == 1) { // check default appID and collectorAddr - AppCollectorData appCollector = appCollectors.get(0); - Assert.assertEquals(appCollector.getApplicationId(), - DEFAULT_APP_ID); + CollectorData appCollector = appCollectors.get(0); + Assert.assertEquals(appCollector.getId(), + CollectorData.toCollectorId(DEFAULT_APP_ID)); Assert.assertEquals(appCollector.getCollectorAddr(), DEFAULT_COLLECTOR_ADDR); + Assert.assertEquals(appCollector.getType(), DEFAULT_COLLECTOR_TYPE); } else { throw new YarnException(ILLEGAL_NUMBER_MESSAGE); } @@ -412,7 +418,7 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( public GetTimelineCollectorContextResponse getTimelineCollectorContext( GetTimelineCollectorContextRequest request) throws YarnException, IOException { - if (request.getApplicationId().getId() == 1) { + if (ApplicationId.fromString(request.getId()).getId() == 1) { return GetTimelineCollectorContextResponse.newInstance( "test_user_id", "test_flow_name", "test_flow_version", 12345678L); } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 153ea4c..54e34a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -110,7 +110,7 @@ public void testNodeHeartbeatRequestPBImpl() { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); - Map collectors = getCollectors(); + Map collectors = getCollectors(); original.setRegisteringCollectors(collectors); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); @@ -154,8 +154,8 @@ public void testNodeHeartbeatResponsePBImpl() { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); - Map collectors = getCollectors(); - original.setAppCollectorsMap(collectors); + Map collectors = getCollectors(); + original.setCollectorsMap(collectors); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -165,7 +165,7 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); - assertEquals(collectors, copy.getAppCollectorsMap()); + assertEquals(collectors, copy.getCollectorsMap()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -345,13 +345,23 @@ public void testUnRegisterNodeManagerRequestPBImpl() throws Exception { return nodeLabels; } - private Map getCollectors() { + private Map getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); - Map collectorMap = + CollectorData data = CollectorData.newInstance( + CollectorData.toCollectorId(appID), + collectorAddr, CollectorData.Type.APP_LEVEL.name()); + Map collectorMap = new HashMap<>(); - collectorMap.put(appID, data); + collectorMap.put(CollectorData.toCollectorId(appID), data); + ApplicationId appID1 = ApplicationId.newInstance(2L, 2); + String collectorAddr1 = "localhost:1"; + CollectorData data1 = CollectorData.newInstance( + CollectorData.toCollectorId(appID1), + collectorAddr1, CollectorData.Type.APP_LEVEL.name(), 1, 1); + collectorMap.put(CollectorData.toCollectorId(appID1), data1); + // TODO: Add new types of collectors once we have something more than app + // level collector. return collectorMap; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 923ce62..b1b9919 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -79,9 +79,9 @@ * @return registered collectors, or null if the timeline service v.2 is not * enabled */ - Map getRegisteringCollectors(); + Map getRegisteringCollectors(); - Map getKnownCollectors(); + Map getKnownCollectors(); ConcurrentMap getContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 96e48a0..36024d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,7 +58,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; @@ -485,9 +485,9 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); - private Map registeringCollectors; + private Map registeringCollectors; - private Map knownCollectors; + private Map knownCollectors; protected final ConcurrentMap increasedContainers = @@ -683,12 +683,12 @@ public OpportunisticContainerAllocator getContainerAllocator() { } @Override - public Map getRegisteringCollectors() { + public Map getRegisteringCollectors() { return this.registeringCollectors; } @Override - public Map getKnownCollectors() { + public Map getKnownCollectors() { return this.knownCollectors; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 97dc1bf..70836d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -74,7 +74,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -525,19 +525,19 @@ private void updateNMResource(Resource resource) { * called when the RM needs to resync with the node. */ private void reregisterCollectors() { - Map knownCollectors + Map knownCollectors = this.context.getKnownCollectors(); - Map registeringCollectors + Map registeringCollectors = this.context.getRegisteringCollectors(); - for (Map.Entry entry + for (Map.Entry entry : knownCollectors.entrySet()) { registeringCollectors.putIfAbsent(entry.getKey(), entry.getValue()); } if (LOG.isDebugEnabled()) { LOG.debug("List of reregistering collectors: "); - for (Map.Entry entry + for (Map.Entry entry : registeringCollectors.entrySet()) { - AppCollectorData data = entry.getValue(); + CollectorData data = entry.getValue(); LOG.debug(entry.getKey() + " : " + data.getCollectorAddr() + "@<" + data.getRMIdentifier() + ", " + data.getVersion() + ">"); } @@ -971,45 +971,52 @@ public void run() { } private void updateTimelineCollectorData( - NodeHeartbeatResponse response) { - Map incomingCollectorsMap = - response.getAppCollectorsMap(); + NodeHeartbeatResponse response) throws YarnException { + Map incomingCollectorsMap = + response.getCollectorsMap(); if (incomingCollectorsMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("No collectors to update RM"); } return; } - Map knownCollectors + Map knownCollectors = context.getKnownCollectors(); - for (Map.Entry entry + for (Map.Entry entry : incomingCollectorsMap.entrySet()) { - ApplicationId appId = entry.getKey(); - AppCollectorData collectorData = entry.getValue(); - // Only handle applications running on local node. - Application application = context.getApplications().get(appId); - if (application != null) { - // Update collector data if the newly received data happens after - // the known data (updates the known data). - AppCollectorData existingData = knownCollectors.get(appId); - if (existingData == null || AppCollectorData.happensBefore( - existingData, collectorData)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " - + collectorData.getCollectorAddr() - + " for application: " + appId + " from RM."); - } - - // Update information for clients. - NMTimelinePublisher nmTimelinePublisher - = context.getNMTimelinePublisher(); - if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorData.getCollectorAddr()); + CollectorData collectorData = entry.getValue(); + // Only check for app level collectors for now. + if (CollectorData.Type.APP_LEVEL.name().equals( + collectorData.getType())) { + CollectorData data = entry.getValue(); + String id = data.getId(); + // Only handle applications running on local node. + // For app level collectors, id should be their app id. + Application application = context.getApplications().get( + data.appIdFromCollectorId()); + if (application != null) { + // Update collector data if the newly received data happens after + // the known data (updates the known data). + CollectorData existingData = knownCollectors.get(id); + if (existingData == null || CollectorData.happensBefore( + existingData, collectorData)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + + collectorData.getCollectorAddr() + + " for collector: " + id + " from RM."); + } + + // Update information for clients. + NMTimelinePublisher nmTimelinePublisher + = context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress( + application.getAppId(), collectorData.getCollectorAddr()); + } + // Update information for the node manager itself. + knownCollectors.put(id, collectorData); + context.getRegisteringCollectors().remove(id); } - // Update information for the node manager itself. - knownCollectors.put(appId, collectorData); - context.getRegisteringCollectors().remove(appId); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index 7fdca78..ad5c0e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -106,24 +106,28 @@ public void serviceStop() throws Exception { @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List newCollectorsList = request.getAppCollectorsList(); + List newCollectorsList = request.getCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { - Map newCollectorsMap = + Map newCollectorsMap = new HashMap<>(); - for (AppCollectorData collector : newCollectorsList) { - ApplicationId appId = collector.getApplicationId(); - newCollectorsMap.put(appId, collector); - // set registered collector address to TimelineClient. - // TODO: Do we need to do this after we received confirmation from - // the RM? - NMTimelinePublisher nmTimelinePublisher = - context.getNMTimelinePublisher(); - if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress(appId, - collector.getCollectorAddr()); + for (CollectorData collector : newCollectorsList) { + String id = collector.getId(); + newCollectorsMap.put(id, collector); + if (CollectorData.Type.APP_LEVEL.name().equals( + collector.getType())) { + // Set registered collector address to TimelineClient if the collector + // is app level collector. + // TODO: Do we need to do this after we received confirmation from + // the RM? + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress( + ApplicationId.fromString(id), collector.getCollectorAddr()); + } } } - Map registeringCollectors + Map registeringCollectors = context.getRegisteringCollectors(); if (registeringCollectors != null) { registeringCollectors.putAll(newCollectorsMap); @@ -140,9 +144,13 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( public GetTimelineCollectorContextResponse getTimelineCollectorContext( GetTimelineCollectorContextRequest request) throws YarnException, IOException { - Application app = context.getApplications().get(request.getApplicationId()); + // For now, this method will only be called for app level collectors. + // However, in future, it needs to handle different types of collectors + // according to the type + ApplicationId appId = ApplicationId.fromString(request.getId()); + Application app = context.getApplications().get(appId); if (app == null) { - throw new YarnException("Application " + request.getApplicationId() + + throw new YarnException("Application " + appId + " doesn't exist on NM."); } return GetTimelineCollectorContextResponse.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 91f40a3..790900d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -556,10 +556,10 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { // Remove collectors info for finished apps. // TODO check we remove related collectors info in failure cases // (YARN-3038) - Map knownCollectors = + Map knownCollectors = app.context.getKnownCollectors(); if (knownCollectors != null) { - knownCollectors.remove(app.getAppId()); + knownCollectors.remove(CollectorData.toCollectorId(app.getAppId())); } // stop timelineClient when application get finished. NMTimelinePublisher nmTimelinePublisher = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index db1b3b9..87953ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -58,7 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -618,11 +618,13 @@ public int getHttpPort() { return null; } - public Map getRegisteringCollectors() { + @Override + public Map getRegisteringCollectors() { return null; } - @Override public Map getKnownCollectors() { + @Override + public Map getKnownCollectors() { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 7ec710e..160c43f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -528,7 +528,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) YarnConfiguration.timelineServiceV2Enabled(getConfig()); if (timelineV2Enabled) { // Check & update collectors info from request. - updateAppCollectorsMap(request); + updateCollectorsMap(request); } // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. @@ -613,14 +613,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { - Map liveAppCollectorsMap = new + Map liveCollectorsMap = new HashMap<>(); Map rmApps = rmContext.getRMApps(); // Set collectors for all running apps on this node. + // Only works for app level collectors for now. for (ApplicationId appId : runningApps) { - AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData(); - if (appCollectorData != null) { - liveAppCollectorsMap.put(appId, appCollectorData); + CollectorData collectorData = rmApps.get(appId).getCollectorData(); + if (collectorData != null) { + liveCollectorsMap.put(collectorData.getId(), collectorData); } else { if (LOG.isDebugEnabled()) { LOG.debug("Collector for applicaton: " + appId + @@ -628,44 +629,50 @@ private void setAppCollectorsMapToResponse( } } } - response.setAppCollectorsMap(liveAppCollectorsMap); + response.setCollectorsMap(liveCollectorsMap); } - private void updateAppCollectorsMap(NodeHeartbeatRequest request) { - Map registeringCollectorsMap = + private void updateCollectorsMap(NodeHeartbeatRequest request) + throws YarnException { + Map registeringCollectorsMap = request.getRegisteringCollectors(); if (registeringCollectorsMap != null && !registeringCollectorsMap.isEmpty()) { Map rmApps = rmContext.getRMApps(); - for (Map.Entry entry: + for (Map.Entry entry: registeringCollectorsMap.entrySet()) { - ApplicationId appId = entry.getKey(); - AppCollectorData collectorData = entry.getValue(); - if (collectorData != null) { - if (!collectorData.stamped()) { - // Stamp the collector if we have not done so - collectorData.setRMIdentifier( - ResourceManager.getClusterTimeStamp()); - collectorData.setVersion( - timelineCollectorVersion.getAndIncrement()); - } - RMApp rmApp = rmApps.get(appId); - if (rmApp == null) { - LOG.warn("Cannot update collector info because application ID: " + - appId + " is not found in RMContext!"); - } else { - AppCollectorData previousCollectorData = rmApp.getCollectorData(); - if (previousCollectorData == null || AppCollectorData.happensBefore( - previousCollectorData, collectorData)) { - // Sending collector update event. - // Note: RM has to store the newly received collector data - // synchronously. Otherwise, the RM may send out stale collector - // data before this update is done, and the RM then crashes, the - // newly updated collector data will get lost. - LOG.info("Update collector information for application " + appId - + " with new address: " + collectorData.getCollectorAddr()); - rmApp.setCollectorData(collectorData); - } + CollectorData collectorData = entry.getValue(); + // For now, only works for app level collectors. + if (collectorData == null || + !CollectorData.Type.APP_LEVEL.name().equals( + collectorData.getType())) { + continue; + } + ApplicationId appId = entry.getValue().appIdFromCollectorId(); + + if (!collectorData.stamped()) { + // Stamp the collector if we have not done so + collectorData.setRMIdentifier( + ResourceManager.getClusterTimeStamp()); + collectorData.setVersion( + timelineCollectorVersion.getAndIncrement()); + } + RMApp rmApp = rmApps.get(appId); + if (rmApp == null) { + LOG.warn("Cannot update collector info because application ID: " + + appId + " is not found in RMContext!"); + } else { + CollectorData previousCollectorData = rmApp.getCollectorData(); + if (previousCollectorData == null || CollectorData.happensBefore( + previousCollectorData, collectorData)) { + // Sending collector update event. + // Note: RM has to store the newly received collector data + // synchronously. Otherwise, the RM may send out stale collector + // data before this update is done, and the RM then crashes, the + // newly updated collector data will get lost. + LOG.info("Update collector information for application " + appId + + " with new address: " + collectorData.getCollectorAddr()); + rmApp.setCollectorData(collectorData); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index a248199..6d11aac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; @@ -392,7 +393,7 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } TimelineCollector timelineCollector = - rmTimelineCollectorManager.get(appId); + rmTimelineCollectorManager.get(CollectorData.toCollectorId(appId)); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); timelineCollector.putEntities(entities, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 4d0c1d8..91785a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -188,8 +188,7 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * enabled. */ @InterfaceAudience.Private - @InterfaceStability.Unstable - AppCollectorData getCollectorData(); + @InterfaceStability.Unstable CollectorData getCollectorData(); /** * Set timeline collector data for the application. It is the caller's @@ -204,7 +203,7 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, */ @InterfaceAudience.Private @InterfaceStability.Unstable - void setCollectorData(AppCollectorData data); + void setCollectorData(CollectorData data); /** * Remove collector data when application is finished or killed. It should diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c711b3e..34770eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -73,7 +73,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -157,7 +157,7 @@ private long storedFinishTime = 0; private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; - private AppCollectorData collectorData; + private CollectorData collectorData; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -521,8 +521,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, public void startTimelineCollector() { AppLevelTimelineCollector collector = new AppLevelTimelineCollector(applicationId); - rmContext.getRMTimelineCollectorManager().putIfAbsent( - applicationId, collector); + rmContext.getRMTimelineCollectorManager().putIfAbsent(collector); } /** @@ -530,7 +529,8 @@ public void startTimelineCollector() { * used only if the timeline service v.2 is enabled. */ public void stopTimelineCollector() { - rmContext.getRMTimelineCollectorManager().remove(applicationId); + rmContext.getRMTimelineCollectorManager().remove( + CollectorData.toCollectorId(applicationId)); } @Override @@ -602,12 +602,12 @@ public void setQueue(String queue) { } @Override - public AppCollectorData getCollectorData() { + public CollectorData getCollectorData() { return this.collectorData; } @Override - public void setCollectorData(AppCollectorData incomingData) { + public void setCollectorData(CollectorData incomingData) { this.collectorData = incomingData; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index 64c3749..61ea562 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -24,8 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; @@ -49,55 +51,59 @@ public RMTimelineCollectorManager(RMContext rmContext) { } @Override - protected void doPostPut(ApplicationId appId, TimelineCollector collector) { - RMApp app = rmContext.getRMApps().get(appId); - if (app == null) { - throw new YarnRuntimeException( - "Unable to get the timeline collector context info for a " + - "non-existing app " + appId); - } - String userId = app.getUser(); - TimelineCollectorContext context = collector.getTimelineEntityContext(); - if (userId != null && !userId.isEmpty()) { - context.setUserId(userId); - } + protected void doPostPut(TimelineCollector collector) { + // For now, we only allow RMs to hold app level timeline collectors. + if (CollectorData.Type.APP_LEVEL.equals(collector.getType())) { + ApplicationId appId = ((AppLevelTimelineCollector) collector).getAppId(); + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + throw new YarnRuntimeException( + "Unable to get the timeline collector context info for a " + + "non-existing app " + appId); + } + String userId = app.getUser(); + TimelineCollectorContext context = collector.getTimelineEntityContext(); + if (userId != null && !userId.isEmpty()) { + context.setUserId(userId); + } - // initialize the flow in the environment with default values for those - // that do not specify the flow tags - // flow name: app name (or app id if app name is missing), - // flow version: "1", flow run id: start time - context.setFlowName(TimelineUtils.generateDefaultFlowName( - app.getName(), appId)); - context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION); - context.setFlowRunId(app.getStartTime()); + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + context.setFlowName(TimelineUtils.generateDefaultFlowName( + app.getName(), appId)); + context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION); + context.setFlowRunId(app.getStartTime()); - // the flow context is received via the application tags - for (String tag : app.getApplicationTags()) { - String[] parts = tag.split(":", 2); - if (parts.length != 2 || parts[1].isEmpty()) { - continue; - } - switch (parts[0].toUpperCase()) { - case TimelineUtils.FLOW_NAME_TAG_PREFIX: - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow name: " + parts[1]); - } - context.setFlowName(parts[1]); - break; - case TimelineUtils.FLOW_VERSION_TAG_PREFIX: - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow version: " + parts[1]); + // the flow context is received via the application tags + for (String tag : app.getApplicationTags()) { + String[] parts = tag.split(":", 2); + if (parts.length != 2 || parts[1].isEmpty()) { + continue; } - context.setFlowVersion(parts[1]); - break; - case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow run id: " + parts[1]); + switch (parts[0].toUpperCase()) { + case TimelineUtils.FLOW_NAME_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + parts[1]); + } + context.setFlowName(parts[1]); + break; + case TimelineUtils.FLOW_VERSION_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + parts[1]); + } + context.setFlowVersion(parts[1]); + break; + case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + parts[1]); + } + context.setFlowRunId(Long.parseLong(parts[1])); + break; + default: + break; } - context.setFlowRunId(Long.parseLong(parts[1])); - break; - default: - break; } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index c1258b8..057921f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -62,7 +62,7 @@ private String version; private Map containerStats = new HashMap(); - private Map registeringCollectors + private Map registeringCollectors = new ConcurrentHashMap<>(); public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { @@ -122,11 +122,11 @@ public void containerIncreaseStatus(Container container) throws Exception { } public void addRegisteringCollector(ApplicationId appId, - AppCollectorData data) { - this.registeringCollectors.put(appId, data); + CollectorData data) { + this.registeringCollectors.put(CollectorData.toCollectorId(appId), data); } - public Map getRegisteringCollectors() { + public Map getRegisteringCollectors() { return this.registeringCollectors; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java index 4bbecf5..a0cf5bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java @@ -22,9 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.junit.Before; import org.junit.Test; @@ -58,14 +56,17 @@ public void testRebuildCollectorDataOnFailover() throws Exception { = new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService()); RMApp app1 = rm1.submitApp(1024); String collectorAddr1 = "1.2.3.4:5"; - AppCollectorData data1 = AppCollectorData.newInstance( - app1.getApplicationId(), collectorAddr1); + CollectorData data1 = CollectorData.newInstance( + CollectorData.toCollectorId(app1.getApplicationId()), + collectorAddr1, CollectorData.Type.APP_LEVEL.name()); nm1.addRegisteringCollector(app1.getApplicationId(), data1); String collectorAddr2 = "5.4.3.2:1"; RMApp app2 = rm1.submitApp(1024); - AppCollectorData data2 = AppCollectorData.newInstance( - app2.getApplicationId(), collectorAddr2, rm1.getStartTime(), 1); + CollectorData data2 = CollectorData.newInstance( + CollectorData.toCollectorId(app2.getApplicationId()), + collectorAddr2, CollectorData.Type.APP_LEVEL.name(), + rm1.getStartTime(), 1); nm1.addRegisteringCollector(app2.getApplicationId(), data2); explicitFailover(); @@ -77,42 +78,48 @@ public void testRebuildCollectorDataOnFailover() throws Exception { nm2.registerNode(runningApps); String collectorAddr12 = "1.2.3.4:56"; - AppCollectorData data12 = AppCollectorData.newInstance( - app1.getApplicationId(), collectorAddr12, rm1.getStartTime(), 0); + CollectorData data12 = CollectorData.newInstance( + CollectorData.toCollectorId(app1.getApplicationId()), collectorAddr12, + CollectorData.Type.APP_LEVEL.name(), rm1.getStartTime(), 0); nm2.addRegisteringCollector(app1.getApplicationId(), data12); String collectorAddr22 = "5.4.3.2:10"; - AppCollectorData data22 = AppCollectorData.newInstance( - app2.getApplicationId(), collectorAddr22, rm1.getStartTime(), 2); + CollectorData data22 = CollectorData.newInstance( + CollectorData.toCollectorId(app2.getApplicationId()), collectorAddr22, + CollectorData.Type.APP_LEVEL.name(), rm1.getStartTime(), 2); nm2.addRegisteringCollector(app2.getApplicationId(), data22); - Map results1 - = nm1.nodeHeartbeat(true).getAppCollectorsMap(); + Map results1 + = nm1.nodeHeartbeat(true).getCollectorsMap(); assertEquals(collectorAddr1, - results1.get(app1.getApplicationId()).getCollectorAddr()); + results1.get(CollectorData.toCollectorId(app1.getApplicationId())) + .getCollectorAddr()); assertEquals(collectorAddr2, - results1.get(app2.getApplicationId()).getCollectorAddr()); + results1.get(CollectorData.toCollectorId(app2.getApplicationId())) + .getCollectorAddr()); - Map results2 - = nm2.nodeHeartbeat(true).getAppCollectorsMap(); + Map results2 + = nm2.nodeHeartbeat(true).getCollectorsMap(); // addr of app1 should be collectorAddr1 since it's registering (no time // stamp). assertEquals(collectorAddr1, - results2.get(app1.getApplicationId()).getCollectorAddr()); + results2.get(CollectorData.toCollectorId(app1.getApplicationId())) + .getCollectorAddr()); // addr of app2 should be collectorAddr22 since its version number is // greater. assertEquals(collectorAddr22, - results2.get(app2.getApplicationId()).getCollectorAddr()); + results2.get(CollectorData.toCollectorId(app2.getApplicationId())) + .getCollectorAddr()); // Now nm1 should get updated collector list nm1.getRegisteringCollectors().clear(); - Map results12 - = nm1.nodeHeartbeat(true).getAppCollectorsMap(); + Map results12 + = nm1.nodeHeartbeat(true).getCollectorsMap(); assertEquals(collectorAddr1, - results12.get(app1.getApplicationId()).getCollectorAddr()); + results12.get(CollectorData.toCollectorId(app1.getApplicationId())) + .getCollectorAddr()); assertEquals(collectorAddr22, - results12.get(app2.getApplicationId()).getCollectorAddr()); - - + results12.get(CollectorData.toCollectorId(app2.getApplicationId())) + .getCollectorAddr()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b271b37..18b0db8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -1164,8 +1165,9 @@ public void testRMRestartTimelineCollectorContext() throws Exception { ApplicationId appId = app.getApplicationId(); TimelineCollectorContext contextBeforeRestart = - rm1.getRMContext().getRMTimelineCollectorManager().get(appId). - getTimelineEntityContext(); + rm1.getRMContext().getRMTimelineCollectorManager() + .get(CollectorData.toCollectorId(appId)) + .getTimelineEntityContext(); // Restart RM. rm2 = createMockRM(conf, memStore); @@ -1173,8 +1175,9 @@ public void testRMRestartTimelineCollectorContext() throws Exception { Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); TimelineCollectorContext contextAfterRestart = - rm2.getRMContext().getRMTimelineCollectorManager().get(appId). - getTimelineEntityContext(); + rm2.getRMContext().getRMTimelineCollectorManager() + .get(CollectorData.toCollectorId(appId)) + .getTimelineEntityContext(); Assert.assertEquals("Collector contexts for an app should be same " + "across restarts", contextBeforeRestart, contextAfterRestart); } finally { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fb7bfb0..cdd3754 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -1006,21 +1006,25 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { RMApp app1 = rm.submitApp(1024); String collectorAddr1 = "1.2.3.4:5"; - app1.setCollectorData(AppCollectorData.newInstance( - app1.getApplicationId(), collectorAddr1)); + app1.setCollectorData(CollectorData.newInstance( + CollectorData.toCollectorId(app1.getApplicationId()), + collectorAddr1, CollectorData.Type.APP_LEVEL.name())); String collectorAddr2 = "5.4.3.2:1"; RMApp app2 = rm.submitApp(1024); - app2.setCollectorData(AppCollectorData.newInstance( - app2.getApplicationId(), collectorAddr2)); + app2.setCollectorData(CollectorData.newInstance( + CollectorData.toCollectorId(app2.getApplicationId()), + collectorAddr2, CollectorData.Type.APP_LEVEL.name())); String collectorAddr3 = "5.4.3.2:2"; - app2.setCollectorData(AppCollectorData.newInstance( - app2.getApplicationId(), collectorAddr3, 0, 1)); + app2.setCollectorData(CollectorData.newInstance( + CollectorData.toCollectorId(app2.getApplicationId()), + collectorAddr3, CollectorData.Type.APP_LEVEL.name(), 0, 1)); String collectorAddr4 = "5.4.3.2:3"; - app2.setCollectorData(AppCollectorData.newInstance( - app2.getApplicationId(), collectorAddr4, 1, 0)); + app2.setCollectorData(CollectorData.newInstance( + CollectorData.toCollectorId(app2.getApplicationId()), + collectorAddr4, CollectorData.Type.APP_LEVEL.name(), 1, 0)); // Create a running container for app1 running on nm1 ContainerId runningContainerId1 = BuilderUtils.newContainerId( @@ -1058,18 +1062,20 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); nodeHeartbeat1 = nm1.nodeHeartbeat(true); - Map map1 - = nodeHeartbeat1.getAppCollectorsMap(); + Map map1 + = nodeHeartbeat1.getCollectorsMap(); Assert.assertEquals(1, map1.size()); Assert.assertEquals(collectorAddr1, - map1.get(app1.getApplicationId()).getCollectorAddr()); + map1.get(CollectorData.toCollectorId(app1.getApplicationId())) + .getCollectorAddr()); nodeHeartbeat2 = nm2.nodeHeartbeat(true); - Map map2 - = nodeHeartbeat2.getAppCollectorsMap(); + Map map2 + = nodeHeartbeat2.getCollectorsMap(); Assert.assertEquals(1, map2.size()); Assert.assertEquals(collectorAddr4, - map2.get(app2.getApplicationId()).getCollectorAddr()); + map2.get(CollectorData.toCollectorId(app2.getApplicationId())) + .getCollectorAddr()); } private void checkRebootedNMCount(MockRM rm2, int count) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 1d96b52..b9b10f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -41,7 +41,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -96,11 +96,11 @@ public StringBuilder getDiagnostics() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public AppCollectorData getCollectorData() { + public CollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void setCollectorData(AppCollectorData collectorData) { + public void setCollectorData(CollectorData collectorData) { throw new UnsupportedOperationException("Not supported yet."); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 3ea4714..63f0b50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -285,7 +285,7 @@ private RMApp createAppAndRegister(ApplicationId appId) { // some stuff which are currently taken care in RMAppImpl rmAppsMapInContext.putIfAbsent(appId, app); AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); - rmTimelineCollectorManager.putIfAbsent(appId, collector); + rmTimelineCollectorManager.putIfAbsent(collector); return app; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 67d99b7..d92f962 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -304,7 +304,7 @@ public CallerContext getCallerContext() { } @Override - public AppCollectorData getCollectorData() { + public CollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } @@ -314,7 +314,7 @@ public void removeCollectorData() { } @Override - public void setCollectorData(AppCollectorData collectorData) { + public void setCollectorData(CollectorData collectorData) { throw new UnsupportedOperationException("Not supported yet."); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index d276269..0cd0269 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import java.util.HashSet; import java.util.Map; @@ -60,7 +61,8 @@ private ScheduledThreadPoolExecutor appAggregationExecutor; public AppLevelTimelineCollector(ApplicationId appId) { - super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); + super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString(), + CollectorData.toCollectorId(appId)); Preconditions.checkNotNull(appId, "AppId shouldn't be null"); this.appId = appId; context = new TimelineCollectorContext(); @@ -75,6 +77,11 @@ public AppLevelTimelineCollector(ApplicationId appId) { } @Override + public CollectorData.Type getType() { + return CollectorData.Type.APP_LEVEL; + } + + @Override protected void serviceInit(Configuration conf) throws Exception { context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, YarnConfiguration.DEFAULT_RM_CLUSTER_ID)); @@ -121,6 +128,10 @@ public TimelineCollectorContext getTimelineEntityContext() { return entityTypesSkipAggregation; } + public ApplicationId getAppId() { + return this.appId; + } + private class AppLevelAggregator implements Runnable { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 0323d7b..ea7a93e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +41,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -87,15 +87,16 @@ protected void serviceStop() throws Exception { } @Override - protected void doPostPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(TimelineCollector collector) { try { // Get context info from NM - updateTimelineCollectorContext(appId, collector); + updateTimelineCollectorContext(collector); // Report to NM if a new collector is added. - reportNewCollectorToNM(appId); + reportNewCollectorToNM(collector); } catch (YarnException | IOException e) { // throw exception here as it cannot be used if failed communicate with NM - LOG.error("Failed to communicate with NM Collector Service for " + appId); + LOG.error("Failed to communicate with NM Collector Service for " + + collector.getId()); throw new YarnRuntimeException(e); } } @@ -144,51 +145,56 @@ private void startWebApp() { timelineRestServerBindAddress); } - private void reportNewCollectorToNM(ApplicationId appId) + private void reportNewCollectorToNM(TimelineCollector collector) throws YarnException, IOException { + String collectorId = collector.getId(); ReportNewCollectorInfoRequest request = - ReportNewCollectorInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); - LOG.info("Report a new collector for application: " + appId + - " to the NM Collector Service."); + ReportNewCollectorInfoRequest.newInstance(collectorId, + this.timelineRestServerBindAddress, collector.getType().name()); + LOG.info("Report a new collector " + collectorId + " with type " + + collector.getType().name() + " to the NM Collector Service."); getNMCollectorService().reportNewCollectorInfo(request); } - private void updateTimelineCollectorContext( - ApplicationId appId, TimelineCollector collector) + private void updateTimelineCollectorContext(TimelineCollector collector) throws YarnException, IOException { - GetTimelineCollectorContextRequest request = - GetTimelineCollectorContextRequest.newInstance(appId); - LOG.info("Get timeline collector context for " + appId); - GetTimelineCollectorContextResponse response = - getNMCollectorService().getTimelineCollectorContext(request); - String userId = response.getUserId(); - if (userId != null && !userId.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the user in the context: " + userId); + // Only works for app level collectors. + if (CollectorData.Type.APP_LEVEL.equals(collector.getType())) { + String id = collector.getId(); + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(id, + collector.getType().name()); + LOG.info("Get timeline collector context for " + id); + GetTimelineCollectorContextResponse response = + getNMCollectorService().getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the user in the context: " + userId); + } + collector.getTimelineEntityContext().setUserId(userId); } - collector.getTimelineEntityContext().setUserId(userId); - } - String flowName = response.getFlowName(); - if (flowName != null && !flowName.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow name: " + flowName); + String flowName = response.getFlowName(); + if (flowName != null && !flowName.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + flowName); + } + collector.getTimelineEntityContext().setFlowName(flowName); } - collector.getTimelineEntityContext().setFlowName(flowName); - } - String flowVersion = response.getFlowVersion(); - if (flowVersion != null && !flowVersion.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow version: " + flowVersion); + String flowVersion = response.getFlowVersion(); + if (flowVersion != null && !flowVersion.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + flowVersion); + } + collector.getTimelineEntityContext().setFlowVersion(flowVersion); } - collector.getTimelineEntityContext().setFlowVersion(flowVersion); - } - long flowRunId = response.getFlowRunId(); - if (flowRunId != 0L) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting the flow run id: " + flowRunId); + long flowRunId = response.getFlowRunId(); + if (flowRunId != 0L) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + flowRunId); + } + collector.getTimelineEntityContext().setFlowRunId(flowRunId); } - collector.getTimelineEntityContext().setFlowRunId(flowRunId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 041e7c2..3dee83d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.api.records.CollectorData; /** * The top-level server for the per-node timeline collector manager. Currently @@ -114,7 +115,7 @@ protected void serviceStop() throws Exception { public boolean addApplication(ApplicationId appId) { AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); - return (collectorManager.putIfAbsent(appId, collector) + return (collectorManager.putIfAbsent(collector) == collector); } @@ -127,7 +128,7 @@ public boolean addApplication(ApplicationId appId) { * @return whether it was removed successfully */ public boolean removeApplication(ApplicationId appId) { - return collectorManager.remove(appId); + return collectorManager.remove(CollectorData.toCollectorId(appId)); } /** @@ -168,7 +169,8 @@ public void run() { @VisibleForTesting boolean hasApplication(ApplicationId appId) { - return collectorManager.containsTimelineCollector(appId); + return collectorManager.containsTimelineCollector( + CollectorData.toCollectorId(appId)); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 2fc3033..dc54aa1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; /** @@ -54,6 +55,7 @@ private static final Log LOG = LogFactory.getLog(TimelineCollector.class); public static final String SEPARATOR = "_"; + private String id; private TimelineWriter writer; private ConcurrentMap aggregationGroups = new ConcurrentHashMap<>(); @@ -62,10 +64,21 @@ private volatile boolean readyToAggregate = false; - public TimelineCollector(String name) { - super(name); + public TimelineCollector(String serviceName, String id) { + super(serviceName); + this.id = id; } + public String getId() { + return this.id; + } + + /** + * Describes the type of the timeline collector. + * @return the type of the timeline collector. + */ + public abstract CollectorData.Type getType(); + @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 9758320..d4f2ef9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -88,9 +88,8 @@ protected void serviceStart() throws Exception { } // access to this map is synchronized with the map itself - private final Map collectors = - Collections.synchronizedMap( - new HashMap()); + private final Map collectors = + Collections.synchronizedMap(new HashMap<>()); public TimelineCollectorManager(String name) { super(name); @@ -104,17 +103,16 @@ protected TimelineWriter getWriter() { * Put the collector into the collection if an collector mapped by id does * not exist. * - * @param appId Application Id for which collector needs to be put. * @param collector timeline collector to be put. * @throws YarnRuntimeException if there was any exception in initializing * and starting the app level service * @return the collector associated with id after the potential put. */ - public TimelineCollector putIfAbsent(ApplicationId appId, - TimelineCollector collector) { - TimelineCollector collectorInTable = null; + public TimelineCollector putIfAbsent(TimelineCollector collector) { + TimelineCollector collectorInTable; + String id = collector.getId(); synchronized (collectors) { - collectorInTable = collectors.get(appId); + collectorInTable = collectors.get(id); if (collectorInTable == null) { try { // initialize, start, and add it to the collection so it can be @@ -122,15 +120,15 @@ public TimelineCollector putIfAbsent(ApplicationId appId, collector.init(getConfig()); collector.setWriter(writer); collector.start(); - collectors.put(appId, collector); - LOG.info("the collector for " + appId + " was added"); + collectors.put(id, collector); + LOG.info("the collector for " + id + " was added"); collectorInTable = collector; - postPut(appId, collectorInTable); + postPut(collectorInTable); } catch (Exception e) { throw new YarnRuntimeException(e); } } else { - LOG.info("the collector for " + appId + " already exists!"); + LOG.info("the collector for " + id + " already exists!"); } } return collectorInTable; @@ -139,65 +137,63 @@ public TimelineCollector putIfAbsent(ApplicationId appId, /** * Callback handler for the timeline collector manager when a collector has * been added into the collector map. - * @param appId Application id of the collector. * @param collector The actual timeline collector that has been added. */ - public void postPut(ApplicationId appId, TimelineCollector collector) { - doPostPut(appId, collector); + public void postPut(TimelineCollector collector) { + doPostPut(collector); collector.setReadyToAggregate(); } /** * A template method that will be called by - * {@link #postPut(ApplicationId, TimelineCollector)}. - * @param appId Application id of the collector. + * {@link #postPut(TimelineCollector)}. * @param collector The actual timeline collector that has been added. */ - protected void doPostPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(TimelineCollector collector) { } /** * Removes the collector for the specified id. The collector is also stopped * as a result. If the collector does not exist, no change is made. * - * @param appId Application Id to remove. + * @param id Collector id to remove. * @return whether it was removed successfully */ - public boolean remove(ApplicationId appId) { - TimelineCollector collector = collectors.remove(appId); + public boolean remove(String id) { + TimelineCollector collector = collectors.remove(id); if (collector == null) { - LOG.error("the collector for " + appId + " does not exist!"); + LOG.error("the collector for " + id + " does not exist!"); } else { - postRemove(appId, collector); + postRemove(collector); // stop the service to do clean up collector.stop(); - LOG.info("The collector service for " + appId + " was removed"); + LOG.info("The collector service for " + id + " was removed"); } return collector != null; } - protected void postRemove(ApplicationId appId, TimelineCollector collector) { + protected void postRemove(TimelineCollector collector) { } /** * Returns the collector for the specified id. * - * @param appId Application Id for which we need to get the collector. + * @param id Id for which we need to get the collector. * @return the collector or null if it does not exist */ - public TimelineCollector get(ApplicationId appId) { - return collectors.get(appId); + public TimelineCollector get(String id) { + return collectors.get(id); } /** * Returns whether the collector for the specified id exists in this * collection. - * @param appId Application Id. + * @param id Collector id. * @return true if collector for the app id is found, false otherwise. */ - public boolean containsTimelineCollector(ApplicationId appId) { - return collectors.containsKey(appId); + public boolean containsTimelineCollector(String id) { + return collectors.containsKey(id); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 29ef1f8..e5790fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -161,7 +162,8 @@ public Response putEntities( NodeTimelineCollectorManager collectorManager = (NodeTimelineCollectorManager) context.getAttribute( NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); - TimelineCollector collector = collectorManager.get(appID); + TimelineCollector collector = collectorManager.get( + CollectorData.toCollectorId(appID)); if (collector == null) { LOG.error("Application: "+ appId + " is not found"); throw new NotFoundException(); // different exception? diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 7bc89c5..13f6a73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.records.CollectorData; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; @@ -96,7 +97,7 @@ public void testMultithreadedAdd() throws Exception { public Boolean call() { AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); - return (collectorManager.putIfAbsent(appId, collector) == collector); + return (collectorManager.putIfAbsent(collector) == collector); } }; tasks.add(task); @@ -113,7 +114,8 @@ public Boolean call() { // check the keys for (int i = 0; i < numApps; i++) { final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertTrue(collectorManager.containsTimelineCollector(appId)); + assertTrue(collectorManager.containsTimelineCollector( + CollectorData.toCollectorId(appId))); } } @@ -128,8 +130,9 @@ public Boolean call() { AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); boolean successPut = - (collectorManager.putIfAbsent(appId, collector) == collector); - return successPut && collectorManager.remove(appId); + (collectorManager.putIfAbsent(collector) == collector); + return successPut + && collectorManager.remove(CollectorData.toCollectorId(appId)); } }; tasks.add(task); @@ -146,7 +149,8 @@ public Boolean call() { // check the keys for (int i = 0; i < numApps; i++) { final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertFalse(collectorManager.containsTimelineCollector(appId)); + assertFalse(collectorManager.containsTimelineCollector( + CollectorData.toCollectorId(appId))); } }